/*
 * Decompiled with CFR 0.152.
 */
package bt.peerexchange;

import bt.BtException;
import bt.event.EventSource;
import bt.metainfo.TorrentId;
import bt.net.Peer;
import bt.peer.PeerSource;
import bt.peer.PeerSourceFactory;
import bt.peerexchange.PeerEvent;
import bt.peerexchange.PeerExchange;
import bt.peerexchange.PeerExchangeConfig;
import bt.peerexchange.PeerExchangePeerSource;
import bt.protocol.Message;
import bt.protocol.extended.ExtendedHandshake;
import bt.service.IRuntimeLifecycleBinder;
import bt.torrent.annotation.Consumes;
import bt.torrent.annotation.Produces;
import bt.torrent.messaging.MessageContext;
import com.google.inject.Inject;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PeerExchangePeerSourceFactory
implements PeerSourceFactory {
    private static final Logger LOGGER = LoggerFactory.getLogger(PeerExchangePeerSourceFactory.class);
    private static final Duration CLEANER_INTERVAL = Duration.ofSeconds(37L);
    private Map<TorrentId, PeerExchangePeerSource> peerSources = new ConcurrentHashMap<TorrentId, PeerExchangePeerSource>();
    private Map<TorrentId, Queue<PeerEvent>> peerEvents = new ConcurrentHashMap<TorrentId, Queue<PeerEvent>>();
    private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
    private Set<Peer> peers = ConcurrentHashMap.newKeySet();
    private Map<Peer, Long> lastSentPEXMessage = new ConcurrentHashMap<Peer, Long>();
    private Duration minMessageInterval;
    private int minEventsPerMessage;
    private int maxEventsPerMessage;

    @Inject
    public PeerExchangePeerSourceFactory(EventSource eventSource, IRuntimeLifecycleBinder lifecycleBinder, PeerExchangeConfig config) {
        this.minMessageInterval = config.getMinMessageInterval();
        this.minEventsPerMessage = config.getMinEventsPerMessage();
        this.maxEventsPerMessage = config.getMaxEventsPerMessage();
        eventSource.onPeerConnected(e -> this.onPeerConnected(e.getTorrentId(), e.getPeer())).onPeerDisconnected(e -> this.onPeerDisconnected(e.getTorrentId(), e.getPeer()));
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "bt.peerexchange.cleaner"));
        lifecycleBinder.onStartup("Schedule periodic cleanup of PEX messages", () -> executor.scheduleAtFixedRate(new Cleaner(), CLEANER_INTERVAL.toMillis(), CLEANER_INTERVAL.toMillis(), TimeUnit.MILLISECONDS));
        lifecycleBinder.onShutdown("Shutdown PEX cleanup scheduler", executor::shutdownNow);
    }

    private void onPeerConnected(TorrentId torrentId, Peer peer) {
        this.getPeerEvents(torrentId).add(PeerEvent.added(peer));
    }

    private void onPeerDisconnected(TorrentId torrentId, Peer peer) {
        this.getPeerEvents(torrentId).add(PeerEvent.dropped(peer));
        this.peers.remove(peer);
        this.lastSentPEXMessage.remove(peer);
    }

    private Queue<PeerEvent> getPeerEvents(TorrentId torrentId) {
        Queue<PeerEvent> existing;
        Queue<PeerEvent> events = this.peerEvents.get(torrentId);
        if (events == null && (existing = this.peerEvents.putIfAbsent(torrentId, events = new PriorityBlockingQueue<PeerEvent>())) != null) {
            events = existing;
        }
        return events;
    }

    @Override
    public PeerSource getPeerSource(TorrentId torrentId) {
        return this.getOrCreatePeerSource(torrentId);
    }

    private PeerExchangePeerSource getOrCreatePeerSource(TorrentId torrentId) {
        PeerExchangePeerSource existing;
        PeerExchangePeerSource peerSource = this.peerSources.get(torrentId);
        if (peerSource == null && (existing = this.peerSources.putIfAbsent(torrentId, peerSource = new PeerExchangePeerSource())) != null) {
            peerSource = existing;
        }
        return peerSource;
    }

    @Consumes
    public void consume(ExtendedHandshake handshake, MessageContext messageContext) {
        if (handshake.getSupportedMessageTypes().contains("ut_pex")) {
            this.peers.add(messageContext.getPeer());
        }
    }

    @Consumes
    public void consume(PeerExchange message, MessageContext messageContext) {
        messageContext.getTorrentId().ifPresent(torrentId -> this.getOrCreatePeerSource((TorrentId)torrentId).addMessage(message));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Produces
    public void produce(Consumer<Message> messageConsumer, MessageContext messageContext) {
        Peer peer = messageContext.getPeer();
        long currentTime = System.currentTimeMillis();
        long lastSentPEXMessageToPeer = this.lastSentPEXMessage.getOrDefault(peer, 0L);
        if (this.peers.contains(peer) && currentTime - lastSentPEXMessageToPeer >= this.minMessageInterval.toMillis()) {
            ArrayList<PeerEvent> events = new ArrayList<PeerEvent>();
            this.rwLock.readLock().lock();
            try {
                Queue<PeerEvent> torrentPeerEvents = this.getPeerEvents(messageContext.getTorrentId().get());
                for (PeerEvent event2 : torrentPeerEvents) {
                    if (event2.getInstant() - lastSentPEXMessageToPeer >= 0L && !event2.getPeer().equals(peer)) {
                        events.add(event2);
                    }
                    if (events.size() < this.maxEventsPerMessage) continue;
                    break;
                }
            }
            finally {
                this.rwLock.readLock().unlock();
            }
            if (events.size() >= this.minEventsPerMessage) {
                this.lastSentPEXMessage.put(peer, currentTime);
                PeerExchange.Builder messageBuilder = PeerExchange.builder();
                events.forEach(event -> {
                    switch (event.getType()) {
                        case ADDED: {
                            messageBuilder.added(event.getPeer());
                            break;
                        }
                        case DROPPED: {
                            messageBuilder.dropped(event.getPeer());
                            break;
                        }
                        default: {
                            throw new BtException("Unknown event type: " + event.getType().name());
                        }
                    }
                });
                messageConsumer.accept(messageBuilder.build());
            }
        }
    }

    private class Cleaner
    implements Runnable {
        private Cleaner() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            PeerExchangePeerSourceFactory.this.rwLock.writeLock().lock();
            try {
                long lruEventTime = PeerExchangePeerSourceFactory.this.lastSentPEXMessage.values().stream().reduce(Long.MAX_VALUE, (a, b) -> a < b ? a : b);
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("Prior to cleaning events. LRU event time: {}, peer events: {}", (Object)lruEventTime, (Object)PeerExchangePeerSourceFactory.this.peerEvents);
                }
                for (Queue events : PeerExchangePeerSourceFactory.this.peerEvents.values()) {
                    PeerEvent event;
                    while ((event = (PeerEvent)events.peek()) != null && event.getInstant() <= lruEventTime) {
                        events.poll();
                    }
                }
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("After cleaning events. Peer events: {}", (Object)PeerExchangePeerSourceFactory.this.peerEvents);
                }
            }
            finally {
                PeerExchangePeerSourceFactory.this.rwLock.writeLock().unlock();
            }
        }
    }
}

