/*
 * Decompiled with CFR 0.152.
 */
package com.caucho.jms.file;

import com.caucho.config.ConfigException;
import com.caucho.db.jdbc.DataSourceImpl;
import com.caucho.java.WorkDir;
import com.caucho.jms.file.FileQueue;
import com.caucho.jms.file.FileQueueEntry;
import com.caucho.jms.message.BytesMessageImpl;
import com.caucho.jms.message.MapMessageImpl;
import com.caucho.jms.message.MessageFactory;
import com.caucho.jms.message.MessageImpl;
import com.caucho.jms.message.MessageType;
import com.caucho.jms.message.ObjectMessageImpl;
import com.caucho.jms.message.StreamMessageImpl;
import com.caucho.jms.message.TextMessageImpl;
import com.caucho.server.resin.Resin;
import com.caucho.util.L10N;
import com.caucho.vfs.Path;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.TextMessage;
import javax.sql.DataSource;

public class FileQueueStore {
    private static final L10N L = new L10N(FileQueueStore.class);
    private static final Logger log = Logger.getLogger(FileQueueStore.class.getName());
    private static final MessageType[] MESSAGE_TYPE = MessageType.values();
    private static int START_LIMIT = 8192;
    private Path _path;
    private DataSource _db;
    private String _name = "default";
    private String _tablePrefix = "jms";
    private MessageFactory _messageFactory;
    private String _queueTable;
    private String _messageTable;
    private long _queueId;
    private Connection _conn;
    private PreparedStatement _sendStmt;
    private PreparedStatement _receiveStartStmt;
    private PreparedStatement _readStmt;
    private PreparedStatement _receiveStmt;
    private PreparedStatement _removeStmt;
    private PreparedStatement _deleteStmt;

    public FileQueueStore(MessageFactory messageFactory) {
        this._messageFactory = messageFactory;
    }

    public void setName(String name) {
        this._name = name;
    }

    public String getName() {
        return this._name;
    }

    public void setPath(Path path) {
        if (!path.exists()) {
            try {
                path.mkdirs();
            }
            catch (IOException e) {
                throw ConfigException.create((Throwable)e);
            }
        }
        if (!path.isDirectory()) {
            throw new ConfigException(L.l("path '{0}' must be a directory", (Object)path));
        }
        this._path = path;
    }

    public Path getPath() {
        return this._path;
    }

    public void setTablePrefix(String prefix) {
        this._tablePrefix = prefix;
    }

    public void init() {
        if (this._path == null) {
            this._path = WorkDir.getLocalWorkDir();
        }
        if (!this._path.isDirectory()) {
            throw new ConfigException(L.l("FileQueue requires a valid persistent directory."));
        }
        Resin resin = Resin.getLocal();
        String serverId = null;
        if (resin != null) {
            serverId = resin.getServerId();
        }
        if (serverId == null) {
            serverId = "anon";
        } else if ("".equals(serverId)) {
            serverId = "default";
        }
        this._queueTable = FileQueueStore.escapeName("jms_queue_" + serverId);
        this._messageTable = FileQueueStore.escapeName("jms_message_" + serverId);
        try {
            DataSourceImpl db = new DataSourceImpl(this._path);
            db.setRemoveOnError(true);
            db.init();
            this._db = db;
            this._conn = this._db.getConnection();
            this.initDatabase();
            this.initQueue();
            this.initStatements();
        }
        catch (SQLException e) {
            throw ConfigException.create((Throwable)e);
        }
    }

    public long send(MessageImpl msg, int priority, long expireTime) {
        FileQueueStore fileQueueStore = this;
        synchronized (fileQueueStore) {
            try {
                ResultSet rs;
                this._sendStmt.setLong(1, this._queueId);
                this._sendStmt.setInt(2, priority);
                this._sendStmt.setLong(3, expireTime);
                this._sendStmt.setString(4, msg.getJMSMessageID());
                this._sendStmt.setBinaryStream(5, msg.propertiesToInputStream(), 0);
                this._sendStmt.setInt(6, msg.getType().ordinal());
                this._sendStmt.setBinaryStream(7, msg.bodyToInputStream(), 0);
                this._sendStmt.executeUpdate();
                if (log.isLoggable(Level.FINE)) {
                    log.fine(this + " send " + msg);
                }
                if (!(rs = this._sendStmt.getGeneratedKeys()).next()) {
                    throw new IllegalStateException();
                }
                long id = rs.getLong(1);
                rs.close();
                return id;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    boolean receiveStart(FileQueue fileQueue) {
        FileQueueStore fileQueueStore = this;
        synchronized (fileQueueStore) {
            try {
                this._receiveStartStmt.setLong(1, this._queueId);
                ResultSet rs = this._receiveStartStmt.executeQuery();
                int count = 0;
                while (rs.next()) {
                    long id = rs.getLong(1);
                    String msgId = rs.getString(2);
                    int priority = rs.getInt(3);
                    long expire = rs.getLong(4);
                    MessageType type = MESSAGE_TYPE[rs.getInt(5)];
                    FileQueueEntry entry = fileQueue.addEntry(id, msgId, -1L, priority, expire, type);
                    ++count;
                }
                rs.close();
                return count < START_LIMIT;
            }
            catch (RuntimeException e) {
                throw e;
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public MessageImpl readMessage(long id, MessageType type) {
        FileQueueStore fileQueueStore = this;
        synchronized (fileQueueStore) {
            try {
                this._readStmt.setLong(1, id);
                ResultSet rs = this._readStmt.executeQuery();
                if (rs.next()) {
                    MessageImpl msg;
                    type = MESSAGE_TYPE[rs.getInt(1)];
                    switch (type) {
                        case NULL: {
                            msg = new MessageImpl();
                            break;
                        }
                        case BYTES: {
                            msg = new BytesMessageImpl();
                            break;
                        }
                        case MAP: {
                            msg = new MapMessageImpl();
                            break;
                        }
                        case OBJECT: {
                            msg = new ObjectMessageImpl();
                            break;
                        }
                        case STREAM: {
                            msg = new StreamMessageImpl();
                            break;
                        }
                        case TEXT: {
                            msg = new TextMessageImpl();
                            break;
                        }
                        default: {
                            msg = new MessageImpl();
                        }
                    }
                    String msgId = rs.getString(2);
                    msg.setJMSMessageID(msgId);
                    InputStream is = rs.getBinaryStream(3);
                    if (is != null) {
                        msg.readProperties(is);
                        is.close();
                    }
                    if ((is = rs.getBinaryStream(4)) != null) {
                        msg.readBody(is);
                        is.close();
                    }
                    return msg;
                }
                rs.close();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public MessageImpl receive() {
        FileQueueStore fileQueueStore = this;
        synchronized (fileQueueStore) {
            try {
                this._receiveStmt.setLong(1, this._queueId);
                ResultSet rs = this._receiveStmt.executeQuery();
                if (rs.next()) {
                    long id = rs.getLong(1);
                    rs.close();
                    TextMessage msg = this._messageFactory.createTextMessage("sample");
                    this._deleteStmt.setLong(1, id);
                    this._deleteStmt.executeUpdate();
                    return (MessageImpl)msg;
                }
                rs.close();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void remove(String id) {
        FileQueueStore fileQueueStore = this;
        synchronized (fileQueueStore) {
            try {
                this._removeStmt.setString(1, id);
                this._removeStmt.executeUpdate();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void delete(long id) {
        FileQueueStore fileQueueStore = this;
        synchronized (fileQueueStore) {
            try {
                this._deleteStmt.setLong(1, id);
                this._deleteStmt.executeUpdate();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private void initDatabase() throws SQLException {
        String sql = "select id, priority from " + this._messageTable + " where 1=0";
        Statement stmt = this._conn.createStatement();
        try {
            ResultSet rs = stmt.executeQuery(sql);
            rs.close();
            return;
        }
        catch (SQLException e) {
            log.finer(e.toString());
            try {
                stmt.executeUpdate("drop table " + this._queueTable);
            }
            catch (SQLException e2) {
                log.finer(e2.toString());
            }
            try {
                stmt.executeUpdate("drop table " + this._messageTable);
            }
            catch (SQLException e3) {
                log.finer(e3.toString());
            }
            sql = "create table " + this._queueTable + " (" + "  id bigint primary key auto_increment," + "  name varchar(128)" + ")";
            stmt.executeUpdate(sql);
            sql = "create table " + this._messageTable + " (" + "  id bigint primary key auto_increment," + "  queue bigint," + "  state integer," + "  priority integer," + "  expire datetime," + "  owner_1 bigint," + "  owner_2 bigint," + "  msg_id varchar(64)," + "  header blob," + "  type integer," + "  body blob," + "  is_valid bit" + ")";
            stmt.executeUpdate(sql);
            return;
        }
    }

    private void initQueue() throws SQLException {
        String sql = "select id from " + this._queueTable + " where name=?";
        PreparedStatement stmt = this._conn.prepareStatement(sql);
        stmt.setString(1, this.getName());
        ResultSet rs = stmt.executeQuery();
        if (rs.next()) {
            this._queueId = rs.getLong(1);
            rs.close();
            stmt.close();
            return;
        }
        stmt.close();
        sql = "insert into " + this._queueTable + " (name) values(?)";
        stmt = this._conn.prepareStatement(sql, 1);
        stmt.setString(1, this.getName());
        stmt.executeUpdate();
        rs = stmt.getGeneratedKeys();
        if (!rs.next()) {
            throw new IllegalStateException();
        }
        this._queueId = rs.getLong(1);
        rs.close();
        stmt.close();
    }

    private void initStatements() throws SQLException {
        String sql = "insert into " + this._messageTable + " (queue,priority,expire,msg_id,header,type,body,is_valid)" + " VALUES(?,?,?,?,?,?,?,1)";
        this._sendStmt = this._conn.prepareStatement(sql, 1);
        sql = "select id,msg_id,header,body from " + this._messageTable + " WHERE queue=? LIMIT 1";
        this._receiveStmt = this._conn.prepareStatement(sql);
        sql = "select type,msg_id,header,body from " + this._messageTable + " WHERE id=?";
        this._readStmt = this._conn.prepareStatement(sql);
        sql = "select id,msg_id,priority,expire,type from " + this._messageTable + " WHERE queue=? AND is_valid=1 ORDER BY id LIMIT " + START_LIMIT;
        this._receiveStartStmt = this._conn.prepareStatement(sql);
        this._removeStmt = this._conn.prepareStatement(sql);
        sql = "update " + this._messageTable + " set body=null, is_valid=0, expire=now() + 120000" + " WHERE id=?";
        this._removeStmt = this._conn.prepareStatement(sql);
        sql = "delete from " + this._messageTable + " WHERE id=?";
        this._deleteStmt = this._conn.prepareStatement(sql);
    }

    private static String escapeName(String name) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < name.length(); ++i) {
            char ch = name.charAt(i);
            if ('a' <= ch && ch <= 'z' || 'A' <= ch && ch <= 'Z' || '0' <= ch && ch <= '0' || ch == '_') {
                sb.append(ch);
                continue;
            }
            sb.append('_');
        }
        return sb.toString();
    }

    public String toString() {
        return this.getClass().getSimpleName() + "[" + this._messageTable + "]";
    }
}

