/*
 * Decompiled with CFR 0.152.
 */
package bt.torrent.messaging;

import bt.data.Bitfield;
import bt.net.Peer;
import bt.protocol.Have;
import bt.protocol.Message;
import bt.protocol.Piece;
import bt.torrent.annotation.Consumes;
import bt.torrent.annotation.Produces;
import bt.torrent.data.BlockWrite;
import bt.torrent.data.DataWorker;
import bt.torrent.messaging.Assignment;
import bt.torrent.messaging.ConnectionState;
import bt.torrent.messaging.Mapper;
import bt.torrent.messaging.MessageContext;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PieceConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(PieceConsumer.class);
    private Bitfield bitfield;
    private DataWorker dataWorker;
    private ConcurrentLinkedQueue<BlockWrite> completedBlocks;

    public PieceConsumer(Bitfield bitfield, DataWorker dataWorker) {
        this.bitfield = bitfield;
        this.dataWorker = dataWorker;
        this.completedBlocks = new ConcurrentLinkedQueue();
    }

    @Consumes
    public void consume(Piece piece, MessageContext context) {
        ConnectionState connectionState;
        Peer peer = context.getPeer();
        if (!this.checkBlockIsExpected(peer, connectionState = context.getConnectionState(), piece)) {
            return;
        }
        if (this.bitfield.isVerified(piece.getPieceIndex())) {
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("Discarding received block because the chunk is already complete and verified: piece index {" + piece.getPieceIndex() + "}, offset {" + piece.getOffset() + "}, length {" + piece.getBlock().length + "}");
            }
            return;
        }
        this.addBlock(peer, connectionState, piece).whenComplete((block, error) -> {
            if (error != null) {
                throw new RuntimeException("Failed to perform request to write block", (Throwable)error);
            }
            if (block.getError().isPresent()) {
                throw new RuntimeException("Failed to perform request to write block", block.getError().get());
            }
            if (block.isRejected()) {
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("Request to write block could not be completed: " + piece);
                }
            } else {
                Optional<CompletableFuture<Boolean>> verificationFuture = block.getVerificationFuture();
                if (verificationFuture.isPresent()) {
                    verificationFuture.get().whenComplete((verified, error1) -> {
                        if (error1 != null) {
                            throw new RuntimeException("Failed to verify block", (Throwable)error1);
                        }
                        this.completedBlocks.add((BlockWrite)block);
                    });
                }
            }
        });
    }

    private boolean checkBlockIsExpected(Peer peer, ConnectionState connectionState, Piece piece) {
        Object key = Mapper.mapper().buildKey(piece.getPieceIndex(), piece.getOffset(), piece.getBlock().length);
        boolean expected = connectionState.getPendingRequests().remove(key);
        if (!expected && LOGGER.isTraceEnabled()) {
            LOGGER.trace("Discarding unexpected block {} from peer: {}", (Object)piece, (Object)peer);
        }
        return expected;
    }

    private CompletableFuture<BlockWrite> addBlock(Peer peer, ConnectionState connectionState, Piece piece) {
        Assignment assignment;
        int pieceIndex = piece.getPieceIndex();
        int offset = piece.getOffset();
        byte[] block = piece.getBlock();
        connectionState.incrementDownloaded(block.length);
        if (connectionState.getCurrentAssignment().isPresent() && pieceIndex == (assignment = connectionState.getCurrentAssignment().get()).getPiece()) {
            assignment.check();
        }
        CompletableFuture<BlockWrite> future = this.dataWorker.addBlock(peer, pieceIndex, offset, block);
        connectionState.getPendingWrites().put(Mapper.mapper().buildKey(pieceIndex, offset, block.length), future);
        return future;
    }

    @Produces
    public void produce(Consumer<Message> messageConsumer) {
        BlockWrite block;
        while ((block = this.completedBlocks.poll()) != null) {
            int pieceIndex = block.getPieceIndex();
            if (this.bitfield.getPieceStatus(pieceIndex) != Bitfield.PieceStatus.COMPLETE_VERIFIED) continue;
            messageConsumer.accept(new Have(pieceIndex));
        }
    }
}

