package cn.com.pcauto.tsm.base.amqp;

import cn.com.pcauto.tsm.base.dto.NotifyMsg;
import cn.com.pcauto.tsm.base.dto.Param;
import cn.com.pcauto.tsm.base.exception.AMQPException;
import cn.com.pcauto.tsm.base.util.TsmBaseUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/com/pcauto/tsm/base/amqp/AMQPClient.class */
public class AMQPClient {
    private static final Logger log = LoggerFactory.getLogger(AMQPClient.class);
    private AMQPProperties properties;
    private ConnectionFactory factory;
    private AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();

    public void setProperties(AMQPProperties aMQPProperties) {
        this.properties = aMQPProperties;
        log.debug(">> AmbqUtils init properties, {}", this.properties);
        initFactory();
    }

    private void initFactory() {
        if (null == this.factory) {
            this.factory = new ConnectionFactory();
            this.factory.setHost(this.properties.getHost());
            this.factory.setCredentialsProvider(new AliyunCredentialsProvider(this.properties.getAccessKey(), this.properties.getSecretKey(), this.properties.getInstanceId()));
            this.factory.setAutomaticRecoveryEnabled(true);
            this.factory.setNetworkRecoveryInterval(5000);
            this.factory.setVirtualHost(this.properties.getHostName());
            this.factory.setPort(this.properties.getPort());
            this.factory.setConnectionTimeout(300000);
            this.factory.setHandshakeTimeout(300000);
            this.factory.setShutdownTimeout(0);
        }
    }

    private Connection buildConnection() throws Exception {
        return this.factory.newConnection();
    }

    private Connection buildConnection(String str) throws Exception {
        return this.factory.newConnection(str);
    }

    private Channel buildChannel(Connection connection) throws Exception {
        return connection.createChannel();
    }

    public DefaultConsumer buildDirectConsumer(String str, final ConsumeProcessor<?> consumeProcessor) {
        try {
            final Channel buildChannel = buildChannel(buildConnection());
            DefaultConsumer defaultConsumer = new DefaultConsumer(buildChannel) { // from class: cn.com.pcauto.tsm.base.amqp.AMQPClient.1
                public void handleDelivery(String str2, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                    String str3 = new String(bArr, "UTF-8");
                    AMQPClient.log.debug("buildDirectConsumer body {}", str3);
                    try {
                        NotifyMsg<Param> convertMsg = TsmBaseUtil.convertMsg(str3);
                        consumeProcessor.process(convertMsg, str3);
                        AMQPClient.log.debug("Received： " + convertMsg + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + basicProperties.getMessageId());
                    } catch (Exception e) {
                        AMQPClient.log.error(">> 消费异常:", e);
                    }
                    buildChannel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            buildChannel.basicConsume(str, defaultConsumer);
            log.debug(">> 建立消费者成功");
            return defaultConsumer;
        } catch (Exception e) {
            log.error(">> 建立消费者失败");
            throw new AMQPException(e);
        }
    }

    public DefaultConsumer buildDirectConsumer(String str, String str2, String str3, final ConsumeProcessor<?> consumeProcessor) {
        try {
            final Channel buildChannel = buildChannel(buildConnection(str));
            DefaultConsumer defaultConsumer = new DefaultConsumer(buildChannel) { // from class: cn.com.pcauto.tsm.base.amqp.AMQPClient.2
                public void handleDelivery(String str4, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                    String str5 = new String(bArr, "UTF-8");
                    AMQPClient.log.debug("buildDirectConsumer body {}", str5);
                    try {
                        NotifyMsg<Param> convertMsg = TsmBaseUtil.convertMsg(str5);
                        consumeProcessor.process(convertMsg, str5);
                        AMQPClient.log.debug("Received： " + convertMsg + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + basicProperties.getMessageId());
                    } catch (Exception e) {
                        AMQPClient.log.error(">> 消费异常:", e);
                    }
                    buildChannel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            buildChannel.basicConsume(str3, false, str2, defaultConsumer);
            log.debug(">> 建立消费者成功");
            return defaultConsumer;
        } catch (Exception e) {
            log.error(">> 建立消费者失败");
            throw new AMQPException(e);
        }
    }

    public void closeConsumer(DefaultConsumer defaultConsumer) throws Exception {
        if (defaultConsumer == null || defaultConsumer.getChannel() == null || !defaultConsumer.getChannel().isOpen()) {
            return;
        }
        defaultConsumer.getChannel().close();
        defaultConsumer.getChannel().getConnection().close();
    }

    public Channel buildProducerChannel() {
        try {
            return buildChannel(buildConnection());
        } catch (Exception e) {
            log.error(">> 建立生产者channel失败");
            throw new AMQPException(e);
        }
    }

    public Channel buildProducerChannel(String str) {
        try {
            return buildChannel(buildConnection(str));
        } catch (Exception e) {
            log.error(">> 建立生产者channel失败");
            throw new AMQPException(e);
        }
    }

    public void produceMsg(Channel channel, String str, String str2, NotifyMsg<?> notifyMsg) throws AMQPException {
        if (channel == null || !channel.isOpen()) {
            throw new AMQPException("producerChannel 为空或已关闭");
        }
        try {
            channel.basicPublish(str, str2, this.props, TsmBaseUtil.writeMsgString(notifyMsg).getBytes());
        } catch (Exception e) {
            throw new AMQPException(e);
        }
    }
}
