/*
 * Decompiled with CFR 0.152.
 */
package bt.net;

import bt.net.IMessageDispatcher;
import bt.net.IPeerConnectionPool;
import bt.net.Peer;
import bt.net.PeerConnection;
import bt.protocol.Message;
import bt.runtime.Config;
import bt.service.IRuntimeLifecycleBinder;
import bt.torrent.TorrentRegistry;
import com.google.inject.Inject;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageDispatcher
implements IMessageDispatcher {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageDispatcher.class);
    private final Map<Peer, Collection<Consumer<Message>>> consumers = new ConcurrentHashMap<Peer, Collection<Consumer<Message>>>();
    private final Map<Peer, Collection<Supplier<Message>>> suppliers = new ConcurrentHashMap<Peer, Collection<Supplier<Message>>>();
    private TorrentRegistry torrentRegistry;

    @Inject
    public MessageDispatcher(IRuntimeLifecycleBinder lifecycleBinder, IPeerConnectionPool pool, TorrentRegistry torrentRegistry, Config config) {
        this.torrentRegistry = torrentRegistry;
        this.initializeMessageLoop(lifecycleBinder, pool, config);
    }

    private void initializeMessageLoop(IRuntimeLifecycleBinder lifecycleBinder, IPeerConnectionPool pool, Config config) {
        ExecutorService executor = Executors.newSingleThreadExecutor(r -> new Thread(r, "bt.net.message-dispatcher"));
        LoopControl loopControl = new LoopControl(config.getMaxMessageProcessingInterval().toMillis());
        MessageDispatchingLoop loop = new MessageDispatchingLoop(pool, loopControl);
        lifecycleBinder.onStartup("Initialize message dispatcher", () -> executor.execute(loop));
        lifecycleBinder.onShutdown("Shutdown message dispatcher", () -> {
            try {
                loop.shutdown();
            }
            finally {
                executor.shutdownNow();
            }
        });
    }

    @Override
    public synchronized void addMessageConsumer(Peer sender, Consumer<Message> messageConsumer) {
        Collection<Consumer<Message>> peerConsumers = this.consumers.get(sender);
        if (peerConsumers == null) {
            peerConsumers = ConcurrentHashMap.newKeySet();
            this.consumers.put(sender, peerConsumers);
        }
        peerConsumers.add(messageConsumer);
    }

    @Override
    public synchronized void addMessageSupplier(Peer recipient, Supplier<Message> messageSupplier) {
        Collection<Supplier<Message>> peerSuppliers = this.suppliers.get(recipient);
        if (peerSuppliers == null) {
            peerSuppliers = ConcurrentHashMap.newKeySet();
            this.suppliers.put(recipient, peerSuppliers);
        }
        peerSuppliers.add(messageSupplier);
    }

    private static class LoopControl {
        private long maxTimeToSleep;
        private int messagesProcessed;
        private long timeToSleep;

        LoopControl(long maxTimeToSleep) {
            this.maxTimeToSleep = maxTimeToSleep;
            this.reset();
        }

        private void reset() {
            this.messagesProcessed = 0;
            this.timeToSleep = 1L;
        }

        void incrementProcessed() {
            ++this.messagesProcessed;
        }

        synchronized void iterationFinished() {
            if (this.messagesProcessed > 0) {
                this.reset();
            } else {
                try {
                    this.wait(this.timeToSleep);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException("Unexpectedly interrupted", e);
                }
                this.timeToSleep = this.timeToSleep < this.maxTimeToSleep ? Math.min(this.timeToSleep << 1, this.maxTimeToSleep) : this.maxTimeToSleep;
            }
        }
    }

    private class MessageDispatchingLoop
    implements Runnable {
        private final IPeerConnectionPool pool;
        private final LoopControl loopControl;
        private volatile boolean shutdown;

        MessageDispatchingLoop(IPeerConnectionPool pool, LoopControl loopControl) {
            this.pool = pool;
            this.loopControl = loopControl;
        }

        @Override
        public void run() {
            while (!this.shutdown) {
                PeerConnection connection;
                Peer peer;
                if (!MessageDispatcher.this.consumers.isEmpty()) {
                    block9: for (Map.Entry entry : MessageDispatcher.this.consumers.entrySet()) {
                        peer = (Peer)entry.getKey();
                        Collection consumers = (Collection)entry.getValue();
                        connection = this.pool.getConnection(peer);
                        if (connection == null || connection.isClosed() || !MessageDispatcher.this.torrentRegistry.isSupportedAndActive(connection.getTorrentId())) continue;
                        block10: while (true) {
                            Message message;
                            try {
                                message = connection.readMessageNow();
                            }
                            catch (Exception e) {
                                LOGGER.error("Error when reading message from peer connection: " + peer, (Throwable)e);
                                continue block9;
                            }
                            if (message == null) continue block9;
                            this.loopControl.incrementProcessed();
                            Iterator e = consumers.iterator();
                            while (true) {
                                if (!e.hasNext()) continue block10;
                                Consumer consumer = (Consumer)e.next();
                                try {
                                    consumer.accept(message);
                                }
                                catch (Exception e2) {
                                    LOGGER.warn("Error in message consumer", (Throwable)e2);
                                }
                            }
                            break;
                        }
                    }
                }
                if (!MessageDispatcher.this.suppliers.isEmpty()) {
                    for (Map.Entry entry : MessageDispatcher.this.suppliers.entrySet()) {
                        peer = (Peer)entry.getKey();
                        Collection suppliers = (Collection)entry.getValue();
                        connection = this.pool.getConnection(peer);
                        if (connection == null || connection.isClosed() || !MessageDispatcher.this.torrentRegistry.isSupportedAndActive(connection.getTorrentId())) continue;
                        for (Supplier messageSupplier : suppliers) {
                            Message message = null;
                            try {
                                message = (Message)messageSupplier.get();
                            }
                            catch (Exception e) {
                                LOGGER.warn("Error in message supplier", (Throwable)e);
                            }
                            if (message == null) continue;
                            this.loopControl.incrementProcessed();
                            try {
                                connection.postMessage(message);
                            }
                            catch (Exception e) {
                                LOGGER.error("Error when writing message", (Throwable)e);
                            }
                        }
                    }
                }
                this.loopControl.iterationFinished();
            }
        }

        public void shutdown() {
            this.shutdown = true;
        }
    }
}

