/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.tablestore.hadoop;

import com.alicloud.openservices.tablestore.SyncClientInterface;
import com.alicloud.openservices.tablestore.TableStoreException;
import com.alicloud.openservices.tablestore.core.utils.Preconditions;
import com.alicloud.openservices.tablestore.model.BatchWriteRowRequest;
import com.alicloud.openservices.tablestore.model.BatchWriteRowResponse;
import com.alicloud.openservices.tablestore.model.Error;
import com.alicloud.openservices.tablestore.model.RowChange;
import com.aliyun.openservices.tablestore.hadoop.BatchWriteWritable;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableStoreRecordWriter
extends RecordWriter<Writable, BatchWriteWritable> {
    private static final Logger logger = LoggerFactory.getLogger(TableStoreRecordWriter.class);
    private SyncClientInterface ots;
    private String outputTable;
    private long rowCounter = 0L;
    private Deque<RowChange> waitingRows = new ArrayDeque<RowChange>();
    private static int BATCH_SIZE_DEFAULT_MAX = 200;
    private static int BATCH_SIZE_INCR = 10;
    private static double BATCH_SIZE_BACKOFF = 0.8;
    private int batchSize = BATCH_SIZE_DEFAULT_MAX;
    private int maxBatchSize = BATCH_SIZE_DEFAULT_MAX;

    public TableStoreRecordWriter(SyncClientInterface ots, String outputTable) {
        Preconditions.checkNotNull((Object)ots, (Object)"ots client must be nonnull.");
        Preconditions.checkNotNull((Object)outputTable, (Object)"output table must be nonnull.");
        this.ots = ots;
        this.outputTable = outputTable;
        logger.debug("max batch size: {}", (Object)this.maxBatchSize);
    }

    public TableStoreRecordWriter(SyncClientInterface ots, String outputTable, int maxBatchSize) {
        Preconditions.checkNotNull((Object)ots, (Object)"ots client must be nonnull.");
        Preconditions.checkNotNull((Object)outputTable, (Object)"output table must be nonnull.");
        this.ots = ots;
        this.outputTable = outputTable;
        this.batchSize = maxBatchSize;
        this.maxBatchSize = maxBatchSize;
        logger.debug("max batch size: {}", (Object)this.maxBatchSize);
    }

    public int getMaxBatchSize() {
        return this.maxBatchSize;
    }

    public void write(Writable x, BatchWriteWritable batch) {
        List<RowChange> rows = batch.getRowChanges();
        for (RowChange row : rows) {
            this.waitingRows.addLast(row);
        }
        while (this.waitingRows.size() >= this.batchSize) {
            this.rowCounter += (long)this.batchWrite();
        }
    }

    public void close(TaskAttemptContext ctx) {
        while (!this.waitingRows.isEmpty()) {
            this.rowCounter += (long)this.batchWrite();
        }
        logger.info("this task wrote {} rows", (Object)this.rowCounter);
        this.ots.shutdown();
    }

    private int batchWrite() {
        block2: while (true) {
            Deque<RowChange> rows;
            if ((rows = this.prepareRows()).size() < this.batchSize) {
                logger.info("small batch size: {}", (Object)rows.size());
            } else {
                logger.debug("batch size: {}", (Object)rows.size());
            }
            try {
                this.launch(rows);
                this.batchSize += BATCH_SIZE_INCR;
                if (this.batchSize > this.maxBatchSize) {
                    this.batchSize = this.maxBatchSize;
                }
                return rows.size();
            }
            catch (TableStoreException ex) {
                if (ex.getErrorCode() != "OTSParameterInvalid" || this.batchSize == 1) {
                    throw ex;
                }
                logger.info("Batch-size backs off. current batch-size: {}", (Object)rows.size());
                this.batchSize = (int)((double)rows.size() * BATCH_SIZE_BACKOFF);
                if (this.batchSize < 1) {
                    this.batchSize = 1;
                }
                while (true) {
                    if (rows.isEmpty()) continue block2;
                    RowChange row = rows.pollLast();
                    this.waitingRows.addFirst(row);
                }
            }
            break;
        }
    }

    private Deque<RowChange> prepareRows() {
        RowChange row;
        int hash;
        ArrayDeque<RowChange> res = new ArrayDeque<RowChange>();
        HashSet<Integer> detectDupRows = new HashSet<Integer>();
        for (int rowCnt = 0; rowCnt < this.batchSize && !this.waitingRows.isEmpty() && !detectDupRows.contains(hash = (row = this.waitingRows.pollFirst()).getTableName().hashCode() * 1327144901 + row.getPrimaryKey().hashCode()); ++rowCnt) {
            res.addLast(row);
            detectDupRows.add(hash);
        }
        return res;
    }

    private void launch(Deque<RowChange> rows) {
        BatchWriteRowRequest req = new BatchWriteRowRequest();
        for (RowChange row : rows) {
            req.addRowChange(row);
        }
        BatchWriteRowResponse resp = this.ots.batchWriteRow(req);
        List failed = resp.getFailedRows();
        for (BatchWriteRowResponse.RowResult res : failed) {
            logger.error("fail to write to TableStore. table: {} error-code: {} error-message: {} request-id: {}", new Object[]{res.getTableName(), res.getError().getCode(), res.getError().getMessage(), resp.getRequestId()});
        }
        if (!failed.isEmpty()) {
            Error err = ((BatchWriteRowResponse.RowResult)failed.get(0)).getError();
            throw new TableStoreException(err.getMessage(), null, err.getCode(), resp.getRequestId(), 0);
        }
    }
}

