/*
 * Decompiled with CFR 0.152.
 */
package com.caucho.jms.hemp;

import com.caucho.config.ConfigException;
import com.caucho.hmtp.HmtpError;
import com.caucho.hmtp.HmtpMessageStream;
import com.caucho.hmtp.HmtpStream;
import com.caucho.hmtp.spi.HmtpBroker;
import com.caucho.hmtp.spi.SimpleHmtpService;
import com.caucho.jms.connection.JmsSession;
import com.caucho.jms.memory.MemorySubscriberQueue;
import com.caucho.jms.message.MessageImpl;
import com.caucho.jms.message.ObjectMessageImpl;
import com.caucho.jms.queue.AbstractQueue;
import com.caucho.jms.queue.AbstractTopic;
import com.caucho.util.L10N;
import com.caucho.webbeans.manager.WebBeansContainer;
import java.io.Serializable;
import java.lang.annotation.Annotation;
import java.util.ArrayList;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.PostConstruct;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.webbeans.ComponentFactory;

public class HempTopic
extends AbstractTopic
implements HmtpMessageStream {
    private static final L10N L = new L10N(HempTopic.class);
    private static final Logger log = Logger.getLogger(HempTopic.class.getName());
    private ArrayList<AbstractQueue> _subscriptionList = new ArrayList();
    private HmtpBroker _broker;
    private HmtpStream _brokerStream;
    private TopicResource _resource = new TopicResource();
    private int _id;
    private boolean _isInit;

    public void setBroker(HmtpBroker broker) {
        this._broker = broker;
    }

    public String getUrl() {
        return "xmpp:name=" + this.getName();
    }

    @PostConstruct
    public void init() {
        super.init();
        if (this._broker == null) {
            WebBeansContainer container = WebBeansContainer.create();
            ComponentFactory<HmtpBroker> comp = container.resolveByType(HmtpBroker.class, new Annotation[0]);
            if (comp == null) {
                throw new ConfigException(L.l("hmpp protocol needs broker"));
            }
            this._broker = (HmtpBroker)comp.get();
            if (this._broker == null) {
                throw new ConfigException(L.l("Need xmpp protocol"));
            }
            this._brokerStream = this._broker.getBrokerStream();
        }
        String jid = this.getName();
        if (!this._isInit) {
            this._isInit = true;
            this._resource.setJid(jid);
            this._broker.addService(this._resource);
        }
    }

    public AbstractQueue createSubscriber(JmsSession session, String name, boolean noLocal) {
        MemorySubscriberQueue queue;
        if (name != null) {
            queue = new MemorySubscriberQueue(session, noLocal);
            queue.setName(this.getName() + ":sub-" + name);
            this._subscriptionList.add(queue);
        } else {
            queue = new MemorySubscriberQueue(session, noLocal);
            queue.setName(this.getName() + ":sub-" + this._id++);
            this._subscriptionList.add(queue);
        }
        if (log.isLoggable(Level.FINE)) {
            log.fine(this + " create-subscriber(" + queue + ")");
        }
        return queue;
    }

    public void closeSubscriber(AbstractQueue queue) {
        if (log.isLoggable(Level.FINE)) {
            log.fine(this + " close-subscriber(" + queue + ")");
        }
        this._subscriptionList.remove(queue);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendMessage(String to, String from, Serializable value) {
        try {
            Object msg = null;
            msg = value instanceof Message ? (Message)value : new ObjectMessageImpl(value);
            ArrayList<AbstractQueue> arrayList = this._subscriptionList;
            synchronized (arrayList) {
                for (int i = 0; i < this._subscriptionList.size(); ++i) {
                    MemorySubscriberQueue queue = (MemorySubscriberQueue)this._subscriptionList.get(i);
                    queue.offer(msg);
                }
            }
        }
        catch (JMSException e) {
            throw new RuntimeException(e);
        }
    }

    public void sendMessageError(String to, String from, Serializable value, HmtpError error) {
        if (log.isLoggable(Level.FINER)) {
            log.finer(this + " sendMessageError to=" + to + " from=" + from + " error=" + error);
        }
    }

    public void send(JmsSession session, MessageImpl msg, long timeout) throws JMSException {
    }

    class TopicResource
    extends SimpleHmtpService {
        TopicResource() {
        }

        protected void setJid(String jid) {
            super.setJid(jid);
        }

        public void sendMessage(String to, String from, Serializable msg) {
            HempTopic.this.sendMessage(to, from, msg);
        }
    }
}

