/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.datahub.example;

import com.aliyun.datahub.DatahubClient;
import com.aliyun.datahub.DatahubConfiguration;
import com.aliyun.datahub.auth.AliyunAccount;
import com.aliyun.datahub.exception.DatahubClientException;
import com.aliyun.datahub.exception.OffsetResetedException;
import com.aliyun.datahub.exception.OffsetSessionChangedException;
import com.aliyun.datahub.exception.SubscriptionOfflineException;
import com.aliyun.datahub.model.GetCursorRequest;
import com.aliyun.datahub.model.GetCursorResult;
import com.aliyun.datahub.model.GetRecordsResult;
import com.aliyun.datahub.model.GetTopicResult;
import com.aliyun.datahub.model.OffsetContext;
import com.aliyun.datahub.model.RecordEntry;
import java.util.List;

public class SingleSubscriptionExample {
    private String accessId = "**you access id**";
    private String accessKey = "**you access key**";
    private String endpoint = "**datahub server endpoint**";
    private String projectName = "**you project name**";
    private String topicName = "**you topic name**";
    private String subId = "**you subscription id**";
    private String shardId = "**shard id**";
    private DatahubConfiguration conf = new DatahubConfiguration(new AliyunAccount(this.accessId, this.accessKey), this.endpoint);
    private DatahubClient client = new DatahubClient(this.conf);

    public void Start() {
        try {
            boolean bExit = false;
            GetTopicResult topicResult = this.client.getTopic(this.projectName, this.topicName);
            OffsetContext offsetCtx = this.client.initOffsetContext(this.projectName, this.topicName, this.subId, this.shardId);
            String cursor = null;
            if (!offsetCtx.hasOffset()) {
                GetCursorResult cursorResult = this.client.getCursor(this.projectName, this.topicName, this.shardId, GetCursorRequest.CursorType.OLDEST);
                cursor = cursorResult.getCursor();
            } else {
                cursor = this.client.getNextOffsetCursor(offsetCtx).getCursor();
            }
            System.out.println("Start consume records, begin offset context:" + offsetCtx.toObjectNode().toString() + ", cursor:" + cursor);
            long recordNum = 0L;
            while (!bExit) {
                try {
                    GetRecordsResult recordResult = this.client.getRecords(this.projectName, this.topicName, this.shardId, cursor, 10, topicResult.getRecordSchema());
                    List<RecordEntry> records = recordResult.getRecords();
                    if (records.size() == 0) {
                        this.client.commitOffset(offsetCtx);
                        System.out.println("commit offset suc! offset context: " + offsetCtx.toObjectNode().toString());
                        Thread.sleep(1000L);
                        System.out.println("sleep 1s and continue consume records! shard id:" + this.shardId);
                        continue;
                    }
                    for (RecordEntry record : records) {
                        offsetCtx.setOffset(record.getOffset());
                        if (++recordNum % 100L != 0L) continue;
                        this.client.commitOffset(offsetCtx);
                        System.out.println("commit offset suc! offset context: " + offsetCtx.toObjectNode().toString());
                    }
                    cursor = recordResult.getNextCursor();
                }
                catch (SubscriptionOfflineException e) {
                    bExit = true;
                    e.printStackTrace();
                }
                catch (OffsetResetedException e) {
                    this.client.updateOffsetContext(offsetCtx);
                    cursor = this.client.getNextOffsetCursor(offsetCtx).getCursor();
                    System.out.println("Restart consume shard:" + this.shardId + ", reset offset:" + offsetCtx.toObjectNode().toString() + ", cursor:" + cursor);
                }
                catch (OffsetSessionChangedException e) {
                    bExit = true;
                    e.printStackTrace();
                }
                catch (Exception e) {
                    bExit = true;
                    e.printStackTrace();
                }
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        SingleSubscriptionExample example = new SingleSubscriptionExample();
        try {
            example.Start();
        }
        catch (DatahubClientException e) {
            e.printStackTrace();
        }
    }
}

