/*
 * Decompiled with CFR 0.152.
 */
package com.alicloud.openservices.tablestore.writer;

import com.alicloud.openservices.tablestore.AsyncClientInterface;
import com.alicloud.openservices.tablestore.ClientException;
import com.alicloud.openservices.tablestore.TableStoreCallback;
import com.alicloud.openservices.tablestore.model.BatchWriteRowRequest;
import com.alicloud.openservices.tablestore.model.BatchWriteRowResponse;
import com.alicloud.openservices.tablestore.model.ConsumedCapacity;
import com.alicloud.openservices.tablestore.model.RowChange;
import com.alicloud.openservices.tablestore.writer.DefaultWriterStatistics;
import com.alicloud.openservices.tablestore.writer.FlushCallback;
import com.alicloud.openservices.tablestore.writer.RowChangeEvent;
import com.alicloud.openservices.tablestore.writer.WriteRPCBuffer;
import com.alicloud.openservices.tablestore.writer.WriterConfig;
import com.lmax.disruptor.EventHandler;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RowChangeEventHandler
implements EventHandler<RowChangeEvent> {
    private Logger logger = LoggerFactory.getLogger(RowChangeEventHandler.class);
    private AsyncClientInterface ots;
    private WriteRPCBuffer buffer;
    private int concurrency;
    private Semaphore semaphore;
    private TableStoreCallback<RowChange, ConsumedCapacity> callback;
    private Executor executor;
    private DefaultWriterStatistics writerStatistics;

    public RowChangeEventHandler(AsyncClientInterface ots, WriterConfig writerConfig, TableStoreCallback<RowChange, ConsumedCapacity> callback, Executor executor, DefaultWriterStatistics writerStatistics) {
        this.ots = ots;
        this.buffer = new WriteRPCBuffer(writerConfig);
        this.concurrency = writerConfig.getConcurrency();
        this.semaphore = new Semaphore(this.concurrency);
        this.callback = callback;
        this.executor = executor;
        this.writerStatistics = writerStatistics;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onEvent(RowChangeEvent rowChangeEvent, long sequence, boolean endOfBatch) throws Exception {
        BatchWriteRowRequest request = null;
        boolean shouldWaitFlush = false;
        ReentrantLock lock = null;
        Condition flushCond = null;
        if (rowChangeEvent.type == RowChangeEvent.EventType.FLUSH) {
            this.logger.debug("FlushSignal with QueueSize: {}", (Object)this.buffer.getTotalRowsCount());
            if (this.buffer.getTotalRowsCount() > 0) {
                request = this.buffer.makeRequest();
                this.buffer.clear();
            }
            shouldWaitFlush = true;
            lock = rowChangeEvent.lock;
            flushCond = rowChangeEvent.condition;
        } else {
            this.writerStatistics.totalRowsCount.incrementAndGet();
            final RowChange rowChange = rowChangeEvent.rowChange;
            boolean succeed = this.buffer.appendRowChange(rowChange);
            if (!succeed) {
                request = this.buffer.makeRequest();
                this.buffer.clear();
                succeed = this.buffer.appendRowChange(rowChange);
                if (!succeed) {
                    this.executor.execute(new Runnable(){

                        @Override
                        public void run() {
                            ((RowChangeEventHandler)RowChangeEventHandler.this).writerStatistics.totalFailedRowsCount.incrementAndGet();
                            RowChangeEventHandler.this.callback.onFailed(rowChange, new ClientException("Can not even append only one row into buffer."));
                        }
                    });
                }
            }
        }
        if (request != null) {
            this.semaphore.acquire();
            this.logger.debug("Acquire semaphore, start send async request.");
            final BatchWriteRowRequest finalRequest = request;
            this.executor.execute(new Runnable(){

                @Override
                public void run() {
                    ((RowChangeEventHandler)RowChangeEventHandler.this).writerStatistics.totalRequestCount.incrementAndGet();
                    RowChangeEventHandler.this.ots.batchWriteRow(finalRequest, new FlushCallback<BatchWriteRowRequest, BatchWriteRowResponse>(RowChangeEventHandler.this.ots, new AtomicInteger(1), RowChangeEventHandler.this.semaphore, RowChangeEventHandler.this.callback, RowChangeEventHandler.this.executor, RowChangeEventHandler.this.writerStatistics));
                }
            });
        }
        if (shouldWaitFlush) {
            this.waitFlush();
            lock.lock();
            try {
                flushCond.signal();
            }
            finally {
                lock.unlock();
            }
        }
    }

    private void waitFlush() throws InterruptedException {
        this.logger.debug("Wait flush.");
        for (int i = 0; i < this.concurrency; ++i) {
            this.semaphore.acquire();
            this.logger.debug("Wait flush: {}, {}", (Object)i, (Object)this.concurrency);
        }
        this.semaphore.release(this.concurrency);
        this.logger.debug("Wait flush finished.");
    }
}

