/*
 * Decompiled with CFR 0.152.
 */
package proj.zoie.impl.indexing;

import java.util.Collection;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import proj.zoie.api.DataConsumer;
import proj.zoie.api.DataProvider;
import proj.zoie.api.ZoieException;
import proj.zoie.mbean.DataProviderAdminMBean;

public abstract class StreamDataProvider<D>
implements DataProvider<D>,
DataProviderAdminMBean {
    private static final Logger log = Logger.getLogger(StreamDataProvider.class);
    private int _batchSize = 1;
    private DataConsumer<D> _consumer = null;
    private DataThread<D> _thread;
    private volatile int _retryTime = 100;
    protected final Comparator<String> _versionComparator;
    private volatile long _maxEventsPerMinute = Long.MAX_VALUE;
    private volatile long _maxVolatileTimeInMillis = Long.MAX_VALUE;

    public StreamDataProvider(Comparator<String> versionComparator) {
        this._versionComparator = versionComparator;
    }

    public void setRetryTime(int retryTime) {
        this._retryTime = retryTime;
    }

    public int getRetryTime() {
        return this._retryTime;
    }

    public void setDataConsumer(DataConsumer<D> consumer) {
        this._consumer = consumer;
    }

    public DataConsumer<D> getDataConsumer() {
        return this._consumer;
    }

    public abstract DataConsumer.DataEvent<D> next();

    public abstract void setStartingOffset(String var1);

    public abstract void reset();

    @Override
    public int getBatchSize() {
        return this._batchSize;
    }

    @Override
    public long getEventsPerMinute() {
        DataThread<D> thread = this._thread;
        if (thread == null) {
            return 0L;
        }
        return ((DataThread)thread).getEventsPerMinute();
    }

    @Override
    public long getMaxEventsPerMinute() {
        return this._maxEventsPerMinute;
    }

    @Override
    public void setMaxEventsPerMinute(long maxEventsPerMinute) {
        this._maxEventsPerMinute = maxEventsPerMinute;
        DataThread<D> thread = this._thread;
        if (thread == null) {
            return;
        }
        ((DataThread)thread).setMaxEventsPerMinute(this._maxEventsPerMinute);
    }

    public void setMaxVolatileTime(long timeInMillis) {
        this._maxVolatileTimeInMillis = timeInMillis;
        DataThread<D> thread = this._thread;
        if (thread == null) {
            return;
        }
        ((DataThread)thread).setMaxVolatileTime(this._maxVolatileTimeInMillis);
    }

    @Override
    public String getStatus() {
        DataThread<D> thread = this._thread;
        if (thread == null) {
            return "dead";
        }
        return ((DataThread)thread).getStatus() + " : " + (Object)((Object)thread.getState());
    }

    @Override
    public void pause() {
        if (this._thread != null) {
            this._thread.pauseDataFeed();
        }
    }

    @Override
    public void resume() {
        if (this._thread != null) {
            this._thread.resumeDataFeed();
        }
    }

    @Override
    public void setBatchSize(int batchSize) {
        this._batchSize = Math.max(1, batchSize);
    }

    @Override
    public long getEventCount() {
        DataThread<D> thread = this._thread;
        if (thread != null) {
            return ((DataThread)this._thread).getEventCount();
        }
        return 0L;
    }

    @Override
    public void stop() {
        if (this._thread != null && this._thread.isAlive()) {
            this._thread.terminate();
            try {
                this._thread.join();
            }
            catch (InterruptedException e) {
                log.warn((Object)"stopping interrupted");
            }
        }
    }

    @Override
    public void start() {
        if (this._thread == null || !this._thread.isAlive()) {
            this.reset();
            this._thread = new DataThread(this);
            ((DataThread)this._thread).setMaxEventsPerMinute(this._maxEventsPerMinute);
            ((DataThread)this._thread).setMaxVolatileTime(this._maxVolatileTimeInMillis);
            this._thread.start();
        }
    }

    public void syncWithVersion(long timeInMillis, String version) throws ZoieException {
        this._thread.syncWithVersion(timeInMillis, version);
    }

    private static final class DataThread<D>
    extends Thread {
        private Collection<DataConsumer.DataEvent<D>> _batch;
        private volatile String _currentVersion;
        private final StreamDataProvider<D> _dataProvider;
        private volatile boolean _paused;
        private volatile boolean _stop;
        private AtomicLong _eventCount = new AtomicLong(0L);
        private volatile long _throttle = 40000L;
        private volatile long _maxVolatileTimeInMillis = Long.MAX_VALUE;
        private volatile long _lastFlushTime = System.currentTimeMillis();
        private boolean _flushing = false;
        private final Comparator<String> _versionComparator;
        private long lastcount = 0L;
        private long[] last60 = new long[60];
        private long[] last60slots = new long[60];
        private volatile int currentslot = 0;
        private static final int window = 3;

        private void resetEventTimer() {
            this._eventCount.set(0L);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private String getStatus() {
            DataThread dataThread = this;
            synchronized (dataThread) {
                if (this._stop) {
                    return "stopped";
                }
                if (this._paused) {
                    return "paused";
                }
                return "running";
            }
        }

        DataThread(StreamDataProvider<D> dataProvider) {
            super("Stream DataThread");
            this.setDaemon(false);
            this._dataProvider = dataProvider;
            this._currentVersion = null;
            this._paused = false;
            this._stop = false;
            this._batch = new LinkedList<DataConsumer.DataEvent<D>>();
            this._versionComparator = dataProvider._versionComparator;
        }

        @Override
        public void start() {
            super.start();
            this.resetEventTimer();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void terminate() {
            this._stop = true;
            DataThread dataThread = this;
            synchronized (dataThread) {
                this.notifyAll();
            }
        }

        void pauseDataFeed() {
            this._paused = true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void resumeDataFeed() {
            DataThread dataThread = this;
            synchronized (dataThread) {
                this._paused = false;
                this.resetEventTimer();
                this.notifyAll();
            }
        }

        private void flush() {
            Collection<DataConsumer.DataEvent<D>> tmp = this._batch;
            this._batch = new LinkedList<DataConsumer.DataEvent<D>>();
            try {
                if (((StreamDataProvider)this._dataProvider)._consumer != null) {
                    int batchSize = tmp.size();
                    ((StreamDataProvider)this._dataProvider)._consumer.consume(tmp);
                    this._eventCount.getAndAdd(batchSize);
                    this.updateStats();
                }
            }
            catch (ZoieException e) {
                log.error((Object)e.getMessage(), (Throwable)e);
            }
            this._lastFlushTime = System.currentTimeMillis();
        }

        private synchronized void updateStats() {
            long newcount = this._eventCount.get();
            long count = newcount - this.lastcount;
            this.lastcount = newcount;
            long now = System.nanoTime();
            if (now - this.last60slots[this.currentslot] > 1000000000L) {
                this.currentslot = (this.currentslot + 1) % this.last60.length;
                this.last60slots[this.currentslot] = now;
                this.last60[this.currentslot] = 0L;
            }
            int n = this.currentslot;
            this.last60[n] = this.last60[n] + count;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void syncWithVersion(long timeInMillis, String version) throws ZoieException {
            if (version == null) {
                return;
            }
            long now = System.currentTimeMillis();
            long due = now + timeInMillis;
            DataThread dataThread = this;
            synchronized (dataThread) {
                try {
                    while (this._currentVersion == null || this._versionComparator.compare(this._currentVersion, version) < 0) {
                        if (now >= due) {
                            throw new ZoieException("sync timed out");
                        }
                        try {
                            this.notifyAll();
                            this._flushing = true;
                            this.wait(Math.min(due - now, 200L));
                        }
                        catch (InterruptedException e) {
                            log.warn((Object)e.getMessage(), (Throwable)e);
                        }
                        now = System.currentTimeMillis();
                    }
                }
                finally {
                    this._flushing = false;
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            String version = this._currentVersion;
            while (!this._stop) {
                DataThread dataThread;
                this.updateStats();
                DataThread dataThread2 = this;
                synchronized (dataThread2) {
                    while (!this._stop && (this._paused || this.getEventsPerMinute() > this._throttle)) {
                        try {
                            this.wait(500L);
                        }
                        catch (InterruptedException e) {
                            Thread.interrupted();
                            continue;
                        }
                        this.updateStats();
                    }
                }
                if (this._stop) continue;
                DataConsumer.DataEvent<D> data = this._dataProvider.next();
                if (data != null) {
                    version = this._versionComparator.compare(version, data.getVersion()) >= 0 ? version : data.getVersion();
                    dataThread = this;
                    synchronized (dataThread) {
                        this._batch.add(data);
                        if (this._batch.size() >= ((StreamDataProvider)this._dataProvider)._batchSize || this._flushing || System.currentTimeMillis() - this._lastFlushTime > this._maxVolatileTimeInMillis) {
                            this.flush();
                            this._currentVersion = version;
                            this.notifyAll();
                        }
                        continue;
                    }
                }
                dataThread = this;
                synchronized (dataThread) {
                    if (this._flushing || this._batch.size() > 0) {
                        this.flush();
                        this._currentVersion = version;
                    }
                    this.notifyAll();
                    try {
                        this.wait(this._dataProvider.getRetryTime());
                    }
                    catch (InterruptedException e) {
                        Thread.interrupted();
                    }
                }
            }
            this.flush();
        }

        private long getEventCount() {
            return this._eventCount.get();
        }

        private long getEventsPerMinute() {
            int slot = this.currentslot;
            long countwindow = 0L;
            long count = 0L;
            for (int i = 0; i < 60; ++i) {
                int id = (slot - i + 60) % 60;
                if (i < 3) {
                    countwindow += this.last60[id];
                }
                count += this.last60[id];
            }
            return Math.max(countwindow * 60L / 3L, count);
        }

        private void setMaxEventsPerMinute(long maxEventsPerMinute) {
            this._throttle = maxEventsPerMinute;
        }

        private void setMaxVolatileTime(long timeInMillis) {
            this._maxVolatileTimeInMillis = timeInMillis;
        }
    }
}

