/*
 * Decompiled with CFR 0.152.
 */
package proj.zoie.impl.indexing;

import java.util.Collection;
import java.util.Comparator;
import java.util.LinkedList;
import org.apache.log4j.Logger;
import proj.zoie.api.DataConsumer;
import proj.zoie.api.LifeCycleCotrolledDataConsumer;
import proj.zoie.api.ZoieException;
import proj.zoie.api.ZoieHealth;
import proj.zoie.impl.indexing.IndexingThread;

public class AsyncDataConsumer<D>
implements LifeCycleCotrolledDataConsumer<D> {
    private static final Logger log = Logger.getLogger(AsyncDataConsumer.class);
    private volatile ConsumerThread _consumerThread;
    private volatile DataConsumer<D> _consumer;
    private String _currentVersion = null;
    private volatile String _bufferedVersion = null;
    private final Comparator<String> _versionComparator;
    private LinkedList<DataConsumer.DataEvent<D>> _batch;
    private int _batchSize;

    public AsyncDataConsumer(Comparator<String> versionComparator) {
        this._versionComparator = versionComparator;
        this._batch = new LinkedList();
        this._batchSize = 1;
        this._consumerThread = null;
    }

    @Override
    public void start() {
        this._consumerThread = new ConsumerThread();
        this._consumerThread.setDaemon(true);
        this._consumerThread.start();
    }

    @Override
    public void stop() {
        this._consumerThread.terminate();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setDataConsumer(DataConsumer<D> consumer) {
        AsyncDataConsumer asyncDataConsumer = this;
        synchronized (asyncDataConsumer) {
            this._consumer = consumer;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setBatchSize(int batchSize) {
        AsyncDataConsumer asyncDataConsumer = this;
        synchronized (asyncDataConsumer) {
            this._batchSize = Math.max(1, batchSize);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getBatchSize() {
        AsyncDataConsumer asyncDataConsumer = this;
        synchronized (asyncDataConsumer) {
            return this._batchSize;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getCurrentBatchSize() {
        AsyncDataConsumer asyncDataConsumer = this;
        synchronized (asyncDataConsumer) {
            return this._batch != null ? this._batch.size() : 0;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String getCurrentVersion() {
        AsyncDataConsumer asyncDataConsumer = this;
        synchronized (asyncDataConsumer) {
            return this._currentVersion;
        }
    }

    public void flushEvents(long timeout) throws ZoieException {
        this.syncWithVersion(timeout, this._bufferedVersion);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void syncWithVersion(long timeInMillis, String version) throws ZoieException {
        if (this._consumerThread == null) {
            throw new ZoieException("not running");
        }
        if (version == null) {
            log.info((Object)"buffered version is NULL. Nothing to flush.");
            return;
        }
        AsyncDataConsumer asyncDataConsumer = this;
        synchronized (asyncDataConsumer) {
            long timeRemaining = Long.MAX_VALUE;
            while (this._currentVersion == null || this._versionComparator.compare(this._currentVersion, version) < 0) {
                if (log.isDebugEnabled()) {
                    if (timeRemaining > timeInMillis + 5000L) {
                        log.debug((Object)("syncWithVersion: timeRemaining: " + timeInMillis + "ms current: " + this._currentVersion + " expecting: " + version));
                    }
                    timeRemaining = timeInMillis;
                }
                this.notifyAll();
                long now1 = System.currentTimeMillis();
                if (timeInMillis <= 0L) {
                    throw new ZoieException("sync timed out at current: " + this._currentVersion + " expecting: " + version);
                }
                try {
                    long waitTime = Math.min(5000L, timeInMillis);
                    this.wait(waitTime);
                }
                catch (InterruptedException e) {
                    log.warn((Object)e.getMessage(), (Throwable)e);
                }
                long now2 = System.currentTimeMillis();
                timeInMillis -= now2 - now1;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void consume(Collection<DataConsumer.DataEvent<D>> data) throws ZoieException {
        if (data == null || data.size() == 0) {
            return;
        }
        AsyncDataConsumer asyncDataConsumer = this;
        synchronized (asyncDataConsumer) {
            while (this._batch.size() >= this._batchSize) {
                if (this._consumerThread == null || !this._consumerThread.isAlive() || this._consumerThread._stop) {
                    ZoieHealth.setFatal();
                    throw new ZoieException("consumer thread has stopped");
                }
                try {
                    this.notifyAll();
                    this.wait();
                }
                catch (InterruptedException e) {}
            }
            for (DataConsumer.DataEvent<D> event : data) {
                this._bufferedVersion = this._bufferedVersion == null ? event.getVersion() : (this._versionComparator.compare(this._bufferedVersion, event.getVersion()) < 0 ? event.getVersion() : this._bufferedVersion);
                this._batch.add(event);
            }
            if (log.isDebugEnabled()) {
                log.debug((Object)("consume:receiving: buffered: " + this._bufferedVersion));
            }
            this.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final void flushBuffer() {
        LinkedList<DataConsumer.DataEvent<D>> currentBatch;
        String version;
        AsyncDataConsumer asyncDataConsumer = this;
        synchronized (asyncDataConsumer) {
            while (this._batch.size() == 0) {
                if (this._consumerThread._stop) {
                    return;
                }
                try {
                    this.notifyAll();
                    this.wait(1000L);
                }
                catch (InterruptedException e) {}
            }
            version = this._currentVersion == null ? this._bufferedVersion : (this._versionComparator.compare(this._currentVersion, this._bufferedVersion) < 0 ? this._bufferedVersion : this._currentVersion);
            currentBatch = this._batch;
            this._batch = new LinkedList();
            this.notifyAll();
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)("flushBuffer: pre-flush: currentVersion: " + this._currentVersion + " processing version: " + version + " of size: " + currentBatch.size()));
        }
        if (this._consumer != null) {
            try {
                this._consumer.consume(currentBatch);
            }
            catch (Exception e) {
                log.error((Object)e.getMessage(), (Throwable)e);
            }
        }
        asyncDataConsumer = this;
        synchronized (asyncDataConsumer) {
            this._currentVersion = version;
            if (log.isDebugEnabled()) {
                log.debug((Object)("flushBuffer: post-flush: currentVersion: " + this._currentVersion));
            }
            this.notifyAll();
        }
    }

    @Override
    public String getVersion() {
        return this._bufferedVersion;
    }

    @Override
    public Comparator<String> getVersionComparator() {
        return this._versionComparator;
    }

    private final class ConsumerThread
    extends IndexingThread {
        boolean _stop;

        ConsumerThread() {
            super("ConsumerThread");
            this._stop = false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void terminate() {
            this._stop = true;
            AsyncDataConsumer asyncDataConsumer = AsyncDataConsumer.this;
            synchronized (asyncDataConsumer) {
                AsyncDataConsumer.this.notifyAll();
            }
        }

        @Override
        public void run() {
            while (!this._stop) {
                AsyncDataConsumer.this.flushBuffer();
            }
        }
    }
}

