/*
 * Decompiled with CFR 0.152.
 */
package com.caucho.jms.connection;

import com.caucho.jms.connection.JmsSession;
import com.caucho.jms.connection.TemporaryQueueImpl;
import com.caucho.jms.message.MessageImpl;
import com.caucho.jms.message.ObjectMessageImpl;
import com.caucho.jms.message.TextMessageImpl;
import com.caucho.jms.queue.AbstractDestination;
import com.caucho.jms.queue.AbstractQueue;
import com.caucho.jms.queue.MessageCallback;
import com.caucho.jms.queue.MessageException;
import com.caucho.jms.queue.QueueEntry;
import com.caucho.jms.selector.Selector;
import com.caucho.jms.selector.SelectorParser;
import com.caucho.util.CurrentTime;
import com.caucho.util.L10N;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;

public class MessageConsumerImpl<E>
implements MessageConsumer {
    private static final Logger log = Logger.getLogger(MessageConsumerImpl.class.getName());
    private static final L10N L = new L10N(MessageConsumerImpl.class);
    private final Object _consumerLock = new Object();
    protected final JmsSession _session;
    private AbstractQueue<E> _queue;
    private MessageListener _messageListener;
    private ClassLoader _listenerClassLoader;
    private MessageConsumerCallback _messageCallback;
    private String _messageSelector;
    protected Selector _selector;
    private boolean _noLocal;
    private boolean _isAutoAcknowledge;
    private AtomicBoolean _isActive = new AtomicBoolean();
    private AtomicBoolean _isClosed = new AtomicBoolean();

    MessageConsumerImpl(JmsSession session, AbstractQueue<E> queue, String messageSelector, boolean noLocal) throws JMSException {
        this._session = session;
        this._queue = queue;
        this._messageSelector = messageSelector;
        if (this._messageSelector != null) {
            SelectorParser parser = new SelectorParser();
            this._selector = parser.parse(messageSelector);
        }
        this._noLocal = noLocal;
        switch (this._session.getAcknowledgeMode()) {
            case 1: 
            case 3: {
                this._isAutoAcknowledge = true;
                break;
            }
            default: {
                this._isAutoAcknowledge = false;
            }
        }
    }

    protected AbstractDestination<E> getDestination() throws JMSException {
        if (this._isClosed.get() || this._session.isClosed()) {
            throw new IllegalStateException(L.l("getDestination(): MessageConsumer is closed."));
        }
        return this._queue;
    }

    public boolean getNoLocal() throws JMSException {
        if (this._isClosed.get() || this._session.isClosed()) {
            throw new IllegalStateException(L.l("getNoLocal(): MessageConsumer is closed."));
        }
        return this._noLocal;
    }

    public MessageListener getMessageListener() throws JMSException {
        if (this._isClosed.get() || this._session.isClosed()) {
            throw new IllegalStateException(L.l("getNoLocal(): MessageConsumer is closed."));
        }
        return this._messageListener;
    }

    public void setMessageListener(MessageListener listener) throws JMSException {
        this.setMessageListener(listener, -1L);
    }

    public void setMessageListener(MessageListener listener, long pollInterval) throws JMSException {
        if (this._isClosed.get() || this._session.isClosed()) {
            throw new IllegalStateException(L.l("setMessageListener(): MessageConsumer is closed."));
        }
        this._messageListener = listener;
        this._messageCallback = new MessageConsumerCallback(listener);
        this._listenerClassLoader = Thread.currentThread().getContextClassLoader();
        if (this.isActive()) {
            this.addMessageCallback();
        }
    }

    public String getMessageSelector() throws JMSException {
        if (this._isClosed.get() || this._session.isClosed()) {
            throw new IllegalStateException(L.l("getMessageSelector(): MessageConsumer is closed."));
        }
        return this._messageSelector;
    }

    public Selector getSelector() {
        return this._selector;
    }

    public boolean isActive() throws JMSException {
        if (this._isClosed.get() || this._session.isClosed()) {
            throw new IllegalStateException(L.l("isActive(): MessageConsumer is closed."));
        }
        return this._session.isActive() && !this._isClosed.get();
    }

    public boolean isClosed() {
        return this._isClosed.get() || this._session.isClosed();
    }

    public Message receive() throws JMSException {
        long timeout = 0x3FFFFFFFFFFFFFFFL;
        if (CurrentTime.isTest()) {
            timeout = 600000L;
        }
        return this.receiveImpl(timeout);
    }

    public Message receiveNoWait() throws JMSException {
        return this.receiveImpl(0L);
    }

    public Message receive(long timeout) throws JMSException {
        Message msg = this.receiveImpl(timeout);
        if (msg != null && log.isLoggable(Level.FINE)) {
            log.fine(this._queue + " receive message " + msg);
        }
        return msg;
    }

    protected Message receiveImpl(long timeout) throws JMSException {
        long expireTime;
        if (this.isClosed()) {
            throw new IllegalStateException(L.l("receiveNoWait(): MessageConsumer is closed."));
        }
        if (0x3FFFFFFFFFFFFFFFL < timeout || timeout < 0L) {
            timeout = 0x3FFFFFFFFFFFFFFFL;
        }
        long now = CurrentTime.getCurrentTimeActual();
        long l = expireTime = timeout > 0L ? now + timeout : 0L;
        if (this._session.isActive()) {
            QueueEntry<E> entry = this._queue.receiveEntry(expireTime, this._isAutoAcknowledge, this._selector);
            if (entry == null) {
                return null;
            }
            E payload = entry.getPayload();
            if (payload == null) {
                return null;
            }
            MessageImpl msg = null;
            if (payload instanceof MessageImpl) {
                msg = (MessageImpl)payload;
            } else if (payload instanceof String) {
                msg = new TextMessageImpl((String)payload);
                msg.setJMSMessageID(entry.getMsgId());
            } else {
                msg = new ObjectMessageImpl((Serializable)payload);
                msg.setJMSMessageID(entry.getMsgId());
            }
            msg.setReceive();
            if (log.isLoggable(Level.FINE)) {
                log.fine(this._queue + " receiving message " + msg);
            }
            if (!this._isAutoAcknowledge) {
                this._session.addTransactedReceive(this._queue, msg);
            }
            return msg;
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean notifyMessageAvailable() {
        Object object = this._consumerLock;
        synchronized (object) {
            this._consumerLock.notifyAll();
        }
        return this._session.notifyMessageAvailable();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean handleMessage(MessageListener listener) {
        block11: {
            if (this._messageListener != null) {
                listener = this._messageListener;
            }
            if (listener == null) {
                return false;
            }
            MessageImpl msg = null;
            try {
                if (msg == null) break block11;
                if (log.isLoggable(Level.FINE)) {
                    log.fine(this._queue + " deliver " + msg + " to listener " + listener);
                }
                msg.setSession(this._session);
                if (this._selector == null || this._selector.isMatch(msg)) {
                    this._session.addTransactedReceive(this._queue, msg);
                    Thread thread = Thread.currentThread();
                    ClassLoader oldLoader = thread.getContextClassLoader();
                    try {
                        thread.setContextClassLoader(this._listenerClassLoader);
                        listener.onMessage((Message)msg);
                    }
                    finally {
                        thread.setContextClassLoader(oldLoader);
                    }
                }
                if (this._session.getTransacted()) {
                    this._session.commit();
                } else {
                    msg.acknowledge();
                }
                return true;
            }
            catch (Exception e) {
                log.log(Level.WARNING, L.l("{0}: message listener '{1}' failed for message '{2}' with exception\n{3}", (Object)this, (Object)listener, (Object)msg, (Object)e.toString()), e);
                this._queue.addListenerException(e);
            }
        }
        return false;
    }

    public void addMessageCallback() {
        MessageConsumerCallback callback = this._messageCallback;
        if (callback != null) {
            boolean isAutoAcknowledge = this._isAutoAcknowledge;
            this._queue.addMessageCallback(callback, isAutoAcknowledge);
        }
    }

    public void start() {
        this._isActive.set(true);
        this.addMessageCallback();
    }

    public void stop() throws JMSException {
        this._isActive.set(false);
        this._queue.removeMessageCallback(this._messageCallback);
    }

    public void close() throws JMSException {
        if (this._isClosed.getAndSet(true)) {
            return;
        }
        if (this._queue instanceof TemporaryQueueImpl) {
            ((TemporaryQueueImpl)this._queue).removeMessageConsumer();
        }
        this._session.removeConsumer(this);
    }

    public String toString() {
        return this.getClass().getSimpleName() + "[" + this._queue + "]";
    }

    class MessageConsumerCallback
    implements MessageCallback<E> {
        private final MessageListener _listener;
        private final ClassLoader _classLoader;

        MessageConsumerCallback(MessageListener listener) {
            this._listener = listener;
            this._classLoader = Thread.currentThread().getContextClassLoader();
        }

        public boolean isClosed() {
            return MessageConsumerImpl.this.isClosed() || !MessageConsumerImpl.this._isActive.get();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void messageReceived(String msgId, E payload) {
            block11: {
                MessageImpl message = null;
                try {
                    if (payload instanceof MessageImpl) {
                        message = (MessageImpl)payload;
                    } else if (payload instanceof String) {
                        message = new TextMessageImpl((String)payload);
                        message.setJMSMessageID(msgId);
                    } else {
                        message = new ObjectMessageImpl((Serializable)payload);
                        message.setJMSMessageID(msgId);
                    }
                    if (MessageConsumerImpl.this._selector != null && !MessageConsumerImpl.this._selector.isMatch(message)) break block11;
                    MessageConsumerImpl.this._session.addTransactedReceive(MessageConsumerImpl.this._queue, message);
                    Thread thread = Thread.currentThread();
                    ClassLoader oldLoader = thread.getContextClassLoader();
                    try {
                        thread.setContextClassLoader(this._classLoader);
                        MessageConsumerImpl.this._session.acquireListenSemaphore();
                        this._listener.onMessage((Message)message);
                    }
                    finally {
                        thread.setContextClassLoader(oldLoader);
                        MessageConsumerImpl.this._session.releaseListenSemaphore();
                        if (MessageConsumerImpl.this._session.getTransacted()) {
                            MessageConsumerImpl.this._session.commit();
                        } else {
                            MessageConsumerImpl.this._session.acknowledge();
                        }
                    }
                }
                catch (JMSException e) {
                    throw new MessageException(e);
                }
            }
        }
    }
}

