/*
 * Decompiled with CFR 0.152.
 */
package bt.torrent.data;

import bt.data.ChunkDescriptor;
import bt.data.ChunkVerifier;
import bt.data.DataDescriptor;
import bt.net.Peer;
import bt.service.IRuntimeLifecycleBinder;
import bt.torrent.data.BlockRead;
import bt.torrent.data.BlockWrite;
import bt.torrent.data.DataWorker;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class DefaultDataWorker
implements DataWorker {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultDataWorker.class);
    private DataDescriptor data;
    private ChunkVerifier verifier;
    private final ExecutorService executor;
    private final int maxPendingTasks;
    private final AtomicInteger pendingTasksCount;

    public DefaultDataWorker(IRuntimeLifecycleBinder lifecycleBinder, DataDescriptor data, ChunkVerifier verifier, int maxQueueLength) {
        this.data = data;
        this.verifier = verifier;
        this.executor = Executors.newSingleThreadExecutor(new ThreadFactory(){
            private AtomicInteger i = new AtomicInteger();

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "bt.torrent.data.worker-" + this.i.incrementAndGet());
            }
        });
        this.maxPendingTasks = maxQueueLength;
        this.pendingTasksCount = new AtomicInteger();
        lifecycleBinder.onShutdown("Shutdown data worker for descriptor: " + data, this.executor::shutdownNow);
    }

    @Override
    public CompletableFuture<BlockRead> addBlockRequest(Peer peer, int pieceIndex, int offset, int length) {
        if (this.pendingTasksCount.get() >= this.maxPendingTasks) {
            LOGGER.warn("Can't accept read block request from peer (" + peer + ") -- queue is full");
            return CompletableFuture.completedFuture(BlockRead.rejected(peer, pieceIndex, offset));
        }
        this.pendingTasksCount.incrementAndGet();
        return CompletableFuture.supplyAsync(() -> {
            try {
                ChunkDescriptor chunk = this.data.getChunkDescriptors().get(pieceIndex);
                byte[] block = chunk.getData().getSubrange(offset, length).getBytes();
                BlockRead blockRead = BlockRead.complete(peer, pieceIndex, offset, block);
                return blockRead;
            }
            catch (Throwable e) {
                BlockRead blockRead = BlockRead.exceptional(peer, e, pieceIndex, offset);
                return blockRead;
            }
            finally {
                this.pendingTasksCount.decrementAndGet();
            }
        }, this.executor);
    }

    @Override
    public CompletableFuture<BlockWrite> addBlock(Peer peer, int pieceIndex, int offset, byte[] block) {
        if (this.pendingTasksCount.get() >= this.maxPendingTasks) {
            LOGGER.warn("Can't accept write block request -- queue is full");
            return CompletableFuture.completedFuture(BlockWrite.rejected(peer, pieceIndex, offset, block));
        }
        this.pendingTasksCount.incrementAndGet();
        return CompletableFuture.supplyAsync(() -> {
            try {
                if (this.data.getBitfield().isVerified(pieceIndex)) {
                    if (LOGGER.isTraceEnabled()) {
                        LOGGER.trace("Rejecting request to write block because the chunk is already complete and verified: piece index {" + pieceIndex + "}, offset {" + offset + "}, length {" + block.length + "}");
                    }
                    BlockWrite blockWrite = BlockWrite.rejected(peer, pieceIndex, offset, block);
                    return blockWrite;
                }
                ChunkDescriptor chunk = this.data.getChunkDescriptors().get(pieceIndex);
                chunk.getData().getSubrange(offset).putBytes(block);
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("Successfully processed block: piece index {" + pieceIndex + "}, offset {" + offset + "}, length {" + block.length + "}");
                }
                CompletableFuture<Boolean> verificationFuture = null;
                if (chunk.isComplete()) {
                    verificationFuture = CompletableFuture.supplyAsync(() -> {
                        boolean verified = this.verifier.verify(chunk);
                        if (verified) {
                            this.data.getBitfield().markVerified(pieceIndex);
                        }
                        return verified;
                    }, this.executor);
                }
                BlockWrite blockWrite = BlockWrite.complete(peer, pieceIndex, offset, block, verificationFuture);
                return blockWrite;
            }
            catch (Throwable e) {
                BlockWrite blockWrite = BlockWrite.exceptional(peer, e, pieceIndex, offset, block);
                return blockWrite;
            }
            finally {
                this.pendingTasksCount.decrementAndGet();
            }
        }, this.executor);
    }
}

