/*
 * Decompiled with CFR 0.152.
 */
package proj.zoie.impl.indexing.luceneNRT;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Fieldable;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Version;
import proj.zoie.api.DataConsumer;
import proj.zoie.api.IndexReaderFactory;
import proj.zoie.api.LifeCycleCotrolledDataConsumer;
import proj.zoie.api.ZoieException;
import proj.zoie.api.indexing.ZoieIndexable;
import proj.zoie.api.indexing.ZoieIndexableInterpreter;

public class ThrottledLuceneNRTDataConsumer<D>
implements LifeCycleCotrolledDataConsumer<D>,
IndexReaderFactory<IndexReader> {
    private static final Logger logger = Logger.getLogger(ThrottledLuceneNRTDataConsumer.class);
    private static int MAX_READER_GENERATION = 3;
    public static final String DOCUMENT_ID_FIELD = "id";
    private IndexWriter _writer = null;
    private Analyzer _analyzer;
    private ZoieIndexableInterpreter<D> _interpreter;
    private Directory _dir;
    private final long _throttleFactor;
    private IndexReader _currentReader;
    private ReopenThread _reopenThread;
    private HashSet<IndexReader> _returnSet = new HashSet();
    private ConcurrentLinkedQueue<IndexReader> _returnList = new ConcurrentLinkedQueue();
    private final MergePolicy _mergePolicy;
    private boolean _appendOnly = false;
    private volatile String _version = null;
    private volatile String _currentReaderVersion = null;

    public ThrottledLuceneNRTDataConsumer(Directory dir, Analyzer analyzer, ZoieIndexableInterpreter<D> interpreter, long throttleFactor, MergePolicy mergePolicy) {
        this._analyzer = analyzer;
        this._interpreter = interpreter;
        this._dir = dir;
        this._throttleFactor = throttleFactor;
        this._mergePolicy = mergePolicy;
        this._currentReader = null;
        if (this._throttleFactor <= 0L) {
            throw new IllegalArgumentException("throttle factor must be > 0");
        }
        this._reopenThread = new ReopenThread();
    }

    public boolean isAppendOnly() {
        return this._appendOnly;
    }

    public void setAppendOnly(boolean _appendOnly) {
        this._appendOnly = _appendOnly;
    }

    @Override
    public void start() {
        try {
            IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_34, this._analyzer);
            if (this._mergePolicy != null) {
                config.setMergePolicy(this._mergePolicy);
            }
            this._writer = new IndexWriter(this._dir, config);
            this._reopenThread.start();
        }
        catch (IOException e) {
            logger.error((Object)("uanble to start consumer: " + e.getMessage()), (Throwable)e);
        }
    }

    @Override
    public void stop() {
        this._reopenThread.terminate();
        if (this._currentReader != null) {
            try {
                this._currentReader.close();
            }
            catch (IOException e) {
                logger.error((Object)e.getMessage(), (Throwable)e);
            }
        }
        if (this._writer != null) {
            try {
                this._writer.close();
            }
            catch (IOException e) {
                logger.error((Object)e.getMessage(), (Throwable)e);
            }
        }
    }

    @Override
    public void consume(Collection<DataConsumer.DataEvent<D>> events) throws ZoieException {
        if (this._writer == null) {
            throw new ZoieException("Internal IndexWriter null, perhaps not started?");
        }
        if (events.size() > 0) {
            for (DataConsumer.DataEvent<D> event : events) {
                ZoieIndexable.IndexingReq[] reqs;
                this._version = event.getVersion();
                ZoieIndexable indexable = this._interpreter.convertAndInterpret(event.getData());
                if (indexable.isSkip()) continue;
                if (!this._appendOnly) {
                    try {
                        this._writer.deleteDocuments(new Term(DOCUMENT_ID_FIELD, String.valueOf(indexable.getUID())));
                    }
                    catch (IOException e) {
                        throw new ZoieException(e.getMessage(), e);
                    }
                }
                for (ZoieIndexable.IndexingReq req : reqs = indexable.buildIndexingReqs()) {
                    Analyzer localAnalyzer = req.getAnalyzer();
                    Document doc = req.getDocument();
                    Field uidField = new Field(DOCUMENT_ID_FIELD, String.valueOf(indexable.getUID()), Field.Store.NO, Field.Index.NOT_ANALYZED_NO_NORMS);
                    uidField.setOmitNorms(true);
                    doc.add((Fieldable)uidField);
                    if (localAnalyzer == null) {
                        localAnalyzer = this._analyzer;
                    }
                    try {
                        this._writer.addDocument(doc, localAnalyzer);
                    }
                    catch (IOException e) {
                        throw new ZoieException(e.getMessage(), e);
                    }
                }
            }
        }
    }

    @Override
    public Analyzer getAnalyzer() {
        return this._analyzer;
    }

    public IndexReader getDiskIndexReader() throws IOException {
        return this._currentReader;
    }

    @Override
    public String getCurrentReaderVersion() {
        return this._currentReaderVersion;
    }

    @Override
    public List<IndexReader> getIndexReaders() throws IOException {
        IndexReader subReader = this.getDiskIndexReader();
        ArrayList<IndexReader> list = new ArrayList<IndexReader>();
        if (subReader != null) {
            list.add(subReader);
        }
        return list;
    }

    @Override
    public void returnIndexReaders(List<IndexReader> readers) {
        if (readers != null) {
            for (IndexReader r : readers) {
                if (r == this._currentReader) continue;
                this.returnReader(r);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void returnReader(IndexReader reader) {
        HashSet<IndexReader> hashSet = this._returnSet;
        synchronized (hashSet) {
            if (!this._returnSet.contains(reader)) {
                this._returnSet.add(reader);
                this._returnList.add(reader);
            }
            while (this._returnList.size() >= MAX_READER_GENERATION) {
                logger.info((Object)("remove and close old reader: " + this._returnList.size() + "/" + this._returnSet.size()));
                IndexReader r = (IndexReader)this._returnList.remove();
                this._returnSet.remove(r);
                try {
                    r.close();
                }
                catch (IOException e) {
                    logger.error((Object)e.getMessage(), (Throwable)e);
                }
            }
        }
    }

    @Override
    public String getVersion() {
        return this._version;
    }

    @Override
    public Comparator<String> getVersionComparator() {
        throw new UnsupportedOperationException();
    }

    private class ReopenThread
    extends Thread {
        private volatile boolean _stop;

        ReopenThread() {
            super("reopen thread");
            this.setDaemon(true);
            this._stop = false;
        }

        void terminate() {
            if (!this._stop) {
                this._stop = true;
                this.interrupt();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (!this._stop) {
                ReopenThread reopenThread = this;
                synchronized (reopenThread) {
                    try {
                        this.wait(ThrottledLuceneNRTDataConsumer.this._throttleFactor);
                    }
                    catch (InterruptedException e) {
                        continue;
                    }
                }
                if (ThrottledLuceneNRTDataConsumer.this._writer == null) continue;
                try {
                    logger.info((Object)"updating reader...");
                    IndexReader oldReader = ThrottledLuceneNRTDataConsumer.this._currentReader;
                    ThrottledLuceneNRTDataConsumer.this._currentReader = IndexReader.open((IndexWriter)ThrottledLuceneNRTDataConsumer.this._writer, (boolean)true);
                    ThrottledLuceneNRTDataConsumer.this._currentReaderVersion = ThrottledLuceneNRTDataConsumer.this._version;
                    if (oldReader == null) continue;
                    ThrottledLuceneNRTDataConsumer.this.returnReader(oldReader);
                }
                catch (IOException e) {
                    logger.error((Object)e.getMessage(), (Throwable)e);
                }
            }
        }
    }
}

