/*
 * Decompiled with CFR 0.152.
 */
package com.jfinal.ext.plugin.redis;

import com.jfinal.ext.plugin.redis.JedisKit;
import com.jfinal.ext.plugin.redis.JedisMessage;
import com.jfinal.ext.plugin.redis.TopicNest;
import com.jfinal.log.Logger;
import java.io.Serializable;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import redis.clients.jedis.Tuple;

public class TopicConsumer {
    protected final Logger logger = Logger.getLogger(this.getClass());
    private TopicNest topic;
    private TopicNest subscriber;
    private String id;
    private int interval = 1000;

    public TopicConsumer(String id, String topic) {
        this.topic = new TopicNest("topic:" + topic);
        this.subscriber = new TopicNest(this.topic.cat("subscribers").key());
        this.id = id;
    }

    public TopicConsumer interval(int interval) {
        this.interval = interval;
        return this;
    }

    private void waitForMessages() {
        try {
            TimeUnit.MILLISECONDS.sleep(this.interval);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void consume(JedisMessage callback) {
        while (true) {
            Object message;
            if ((message = this.readUntilEnd()) != null) {
                callback.onMessage(message);
                this.goNext();
                continue;
            }
            this.waitForMessages();
        }
    }

    public <T extends Serializable> T consume() {
        T message = this.readUntilEnd();
        this.goNext();
        return message;
    }

    private <T extends Serializable> T readUntilEnd() {
        if (this.unreadMessages() > 0) {
            T message = this.read();
            return message;
        }
        return null;
    }

    private void goNext() {
        JedisKit.zincrby(this.subscriber.key(), 1.0, this.id);
    }

    private int getLastReadMessage() {
        Double lastMessageRead = JedisKit.zscore(this.subscriber.key(), this.id);
        if (lastMessageRead == null) {
            Set<Tuple> zrangeWithScores = JedisKit.zrangeWithScores(this.subscriber.key(), 0L, 1L);
            if (zrangeWithScores.iterator().hasNext()) {
                Tuple next = zrangeWithScores.iterator().next();
                Integer lowest = (int)next.getScore() - 1;
                JedisKit.zadd(this.subscriber.key(), lowest.intValue(), this.id);
                return lowest;
            }
            return 0;
        }
        return lastMessageRead.intValue();
    }

    private int getTopicSize() {
        String stopicSize = (String)JedisKit.get(this.topic.key());
        int topicSize = 0;
        if (stopicSize != null) {
            topicSize = Integer.valueOf(stopicSize);
        }
        return topicSize;
    }

    public <T extends Serializable> T read() {
        int lastReadMessage = this.getLastReadMessage();
        this.logger.debug("lastReadMessage " + lastReadMessage);
        String key = this.topic.cat("message").cat(lastReadMessage + 1).key();
        Object message = JedisKit.get(key);
        this.logger.info("consume the message,key[" + key + "],value[" + message + "]");
        return message;
    }

    public int unreadMessages() {
        return this.getTopicSize() - this.getLastReadMessage();
    }
}

