/*
 * Decompiled with CFR 0.152.
 */
package bt.net;

import bt.CountingThreadFactory;
import bt.event.EventSink;
import bt.metainfo.TorrentId;
import bt.net.IPeerConnectionPool;
import bt.net.Peer;
import bt.net.PeerConnection;
import bt.runtime.Config;
import bt.service.IRuntimeLifecycleBinder;
import com.google.inject.Inject;
import java.time.Duration;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PeerConnectionPool
implements IPeerConnectionPool {
    private static final Logger LOGGER = LoggerFactory.getLogger(PeerConnectionPool.class);
    private Config config;
    private EventSink eventSink;
    private ScheduledExecutorService cleaner;
    private Connections connections;
    private ReentrantLock cleanerLock;
    private Duration peerConnectionInactivityThreshold;

    @Inject
    public PeerConnectionPool(EventSink eventSink, IRuntimeLifecycleBinder lifecycleBinder, Config config) {
        this.config = config;
        this.eventSink = eventSink;
        this.peerConnectionInactivityThreshold = config.getPeerConnectionInactivityThreshold();
        this.connections = new Connections();
        this.cleanerLock = new ReentrantLock();
        this.cleaner = Executors.newScheduledThreadPool(1, r -> new Thread(r, "bt.net.pool.cleaner"));
        lifecycleBinder.onStartup("Schedule periodic cleanup of stale peer connections", () -> this.cleaner.scheduleAtFixedRate(new Cleaner(), 1L, 1L, TimeUnit.SECONDS));
        ExecutorService executor = Executors.newFixedThreadPool(config.getMaxPendingConnectionRequests(), CountingThreadFactory.daemonFactory("bt.net.pool.connection-worker"));
        lifecycleBinder.onShutdown("Shutdown outgoing connection request processor", executor::shutdownNow);
        lifecycleBinder.onShutdown("Shutdown connection pool", this::shutdown);
    }

    @Override
    public PeerConnection getConnection(Peer peer) {
        return this.connections.get(peer).orElse(null);
    }

    @Override
    public void visitConnections(TorrentId torrentId, Consumer<PeerConnection> visitor) {
        this.connections.visitConnections(torrentId, visitor);
    }

    @Override
    public int size() {
        return this.connections.count();
    }

    @Override
    public PeerConnection addConnectionIfAbsent(PeerConnection newConnection) {
        Peer peer = newConnection.getRemotePeer();
        if (!this.addConnection(newConnection)) {
            PeerConnection existingConnection = this.connections.get(peer).orElseThrow(() -> new RuntimeException("Failed to add new connection for peer: " + peer));
            if (existingConnection == null) {
                throw new RuntimeException("Failed to add new connection for peer: " + newConnection.getRemotePeer());
            }
            newConnection = existingConnection;
        }
        return newConnection;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean addConnection(PeerConnection newConnection) {
        boolean added = false;
        PeerConnection existingConnection = null;
        this.cleanerLock.lock();
        try {
            if (this.connections.count() >= this.config.getMaxPeerConnections()) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Closing newly created connection with {} due to exceeding of connections limit", (Object)newConnection.getRemotePeer());
                }
                newConnection.closeQuietly();
            } else {
                existingConnection = this.connections.putIfAbsent(newConnection.getRemotePeer(), newConnection);
                added = existingConnection == null;
            }
        }
        finally {
            this.cleanerLock.unlock();
        }
        if (existingConnection != null) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Connection already exists for peer: " + newConnection.getRemotePeer());
            }
            newConnection.closeQuietly();
            newConnection = existingConnection;
        }
        if (added) {
            this.eventSink.firePeerConnected(newConnection.getTorrentId(), newConnection.getRemotePeer());
        }
        return added;
    }

    private void purgeConnectionWithPeer(Peer peer) {
        PeerConnection purged = this.connections.remove(peer);
        if (purged != null) {
            if (!purged.isClosed()) {
                purged.closeQuietly();
            }
            this.eventSink.firePeerDisconnected(purged.getTorrentId(), peer);
        }
    }

    private void shutdown() {
        this.shutdownCleaner();
        this.connections.visitConnections(PeerConnection::closeQuietly);
    }

    private void shutdownCleaner() {
        this.cleaner.shutdown();
        try {
            this.cleaner.awaitTermination(1L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            LOGGER.warn("Interrupted while waiting for the cleaner's shutdown");
        }
        if (!this.cleaner.isShutdown()) {
            this.cleaner.shutdownNow();
        }
    }

    private static class Connections {
        private ConcurrentMap<Peer, PeerConnection> connections = new ConcurrentHashMap<Peer, PeerConnection>();
        private ConcurrentMap<TorrentId, Collection<PeerConnection>> connectionsByTorrent = new ConcurrentHashMap<TorrentId, Collection<PeerConnection>>();

        Connections() {
        }

        int count() {
            return this.connections.size();
        }

        synchronized PeerConnection remove(Peer peer) {
            PeerConnection removed = (PeerConnection)this.connections.remove(peer);
            if (removed != null) {
                this.connectionsByTorrent.values().forEach(connections -> {
                    connections.remove(removed);
                    if (connections.isEmpty()) {
                        this.connectionsByTorrent.remove(removed.getTorrentId());
                    }
                });
            }
            return removed;
        }

        synchronized PeerConnection putIfAbsent(Peer peer, PeerConnection connection) {
            PeerConnection existing = this.connections.putIfAbsent(peer, connection);
            TorrentId torrentId = connection.getTorrentId();
            if (existing == null && torrentId != null) {
                this.connectionsByTorrent.computeIfAbsent(torrentId, id -> ConcurrentHashMap.newKeySet()).add(connection);
            }
            return existing;
        }

        Optional<PeerConnection> get(Peer peer) {
            return Optional.ofNullable(this.connections.get(peer));
        }

        void visitConnections(Consumer<PeerConnection> visitor) {
            this.connections.values().forEach(visitor::accept);
        }

        void visitConnections(TorrentId torrentId, Consumer<PeerConnection> visitor) {
            Collection connections = (Collection)this.connectionsByTorrent.get(torrentId);
            if (connections != null) {
                connections.forEach(visitor::accept);
            }
        }
    }

    private class Cleaner
    implements Runnable {
        private Cleaner() {
        }

        @Override
        public void run() {
            if (PeerConnectionPool.this.connections.count() == 0) {
                return;
            }
            PeerConnectionPool.this.cleanerLock.lock();
            try {
                PeerConnectionPool.this.connections.visitConnections(connection -> {
                    Peer peer = connection.getRemotePeer();
                    if (connection.isClosed()) {
                        PeerConnectionPool.this.purgeConnectionWithPeer(peer);
                    } else if (System.currentTimeMillis() - connection.getLastActive() >= PeerConnectionPool.this.peerConnectionInactivityThreshold.toMillis()) {
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("Removing inactive peer connection: {}", (Object)peer);
                        }
                        PeerConnectionPool.this.purgeConnectionWithPeer(peer);
                    }
                });
            }
            finally {
                PeerConnectionPool.this.cleanerLock.unlock();
            }
        }
    }
}

