/*
 * Decompiled with CFR 0.152.
 */
package com.twitter.elephantbird.mapreduce.input;

import com.twitter.elephantbird.util.HadoopCompat;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class LzoRecordReader<K, V>
extends RecordReader<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(LzoRecordReader.class);
    public static final String BAD_RECORD_THRESHOLD_CONF_KEY = "elephantbird.mapred.input.bad.record.threshold";
    public static final String BAD_RECORD_MIN_COUNT_CONF_KEY = "elephantbird.mapred.input.bad.record.min";
    public static final String BAD_RECORD_CHECK_AT_CLOSE = "elephantbird.mapred.input.bad.record.check.only.in.close";
    protected long start_;
    protected long pos_;
    protected long end_;
    private FSDataInputStream fileIn_;
    protected InputErrorTracker errorTracker;

    public float getProgress() {
        if (this.start_ == this.end_) {
            return 0.0f;
        }
        return Math.min(1.0f, (float)(this.pos_ - this.start_) / (float)(this.end_ - this.start_));
    }

    public synchronized long getPos() throws IOException {
        return this.pos_;
    }

    public long getLzoFilePos() throws IOException {
        return this.fileIn_.getPos();
    }

    public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException {
        FileSplit split = (FileSplit)genericSplit;
        this.start_ = split.getStart();
        this.end_ = this.start_ + split.getLength();
        Path file = split.getPath();
        Configuration job = HadoopCompat.getConfiguration((JobContext)context);
        this.errorTracker = new InputErrorTracker(job);
        LOG.info("input split: " + file + " " + this.start_ + ":" + this.end_);
        FileSystem fs = file.getFileSystem(job);
        CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(job);
        CompressionCodec codec = compressionCodecs.getCodec(file);
        if (codec == null) {
            throw new IOException("No codec for file " + file + " found, cannot run");
        }
        this.fileIn_ = fs.open(split.getPath());
        this.createInputReader((InputStream)codec.createInputStream((InputStream)this.fileIn_), job);
        if (this.start_ != 0L) {
            this.fileIn_.seek(this.start_);
            this.skipToNextSyncPoint(false);
            this.start_ = this.fileIn_.getPos();
            LOG.info("Start is now " + this.start_);
        } else {
            this.skipToNextSyncPoint(true);
        }
        this.pos_ = this.start_;
    }

    public void close() throws IOException {
        if (this.errorTracker != null) {
            this.errorTracker.close();
        }
    }

    protected abstract void createInputReader(InputStream var1, Configuration var2) throws IOException;

    protected abstract void skipToNextSyncPoint(boolean var1) throws IOException;

    static class InputErrorTracker
    implements Closeable {
        long numRecords;
        long numErrors;
        final double errorThreshold;
        final long minErrors;
        final boolean checkOnlyInClose;

        InputErrorTracker(Configuration conf) {
            this.errorThreshold = conf.getFloat(LzoRecordReader.BAD_RECORD_THRESHOLD_CONF_KEY, 1.0E-4f);
            this.minErrors = conf.getLong(LzoRecordReader.BAD_RECORD_MIN_COUNT_CONF_KEY, 2L);
            this.checkOnlyInClose = conf.getBoolean(LzoRecordReader.BAD_RECORD_CHECK_AT_CLOSE, true);
            this.numRecords = 0L;
            this.numErrors = 0L;
        }

        void incRecords() {
            ++this.numRecords;
        }

        @Override
        public void close() throws IOException {
            try {
                if (this.numErrors > 0L) {
                    this.checkErrorThreshold(null);
                }
            }
            catch (RuntimeException e) {
                throw new IOException(e);
            }
        }

        void incErrors(Throwable cause) {
            ++this.numErrors;
            if (this.numErrors > this.numRecords) {
                throw new RuntimeException("Forgot to invoke incRecords()?");
            }
            if (!this.checkOnlyInClose) {
                this.checkErrorThreshold(cause);
            }
        }

        private void checkErrorThreshold(Throwable cause) {
            if (this.numErrors > 0L && this.errorThreshold <= 0.0) {
                throw new RuntimeException("error while reading input records", cause);
            }
            LOG.warn("Error while reading an input record (" + this.numErrors + " out of " + this.numRecords + " so far ): ", cause);
            double errRate = (double)this.numErrors / (double)this.numRecords;
            if (this.numErrors >= this.minErrors && errRate > this.errorThreshold) {
                LOG.error(this.numErrors + " out of " + this.numRecords + " crosses configured threshold (" + this.errorThreshold + ")");
                throw new RuntimeException("error rate while reading input records crossed threshold", cause);
            }
        }
    }
}

