package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.File;
import java.io.IOException;
import java.util.Objects;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.class */
public class RollingPolicyTest {

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

    @Test
    public void testDefaultRollingPolicy() throws Exception {
        File newFolder = TEMP_FOLDER.newFolder();
        OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> createCustomRescalingTestSink = TestUtils.createCustomRescalingTestSink(newFolder, 1, 0, 1L, new TestUtils.TupleToStringBucketer(), new SimpleStringEncoder(), DefaultRollingPolicy.create().withMaxPartSize(10L).withInactivityInterval(4L).withRolloverInterval(11L).build(), new DefaultBucketFactoryImpl());
        Throwable th = null;
        try {
            try {
                createCustomRescalingTestSink.setup();
                createCustomRescalingTestSink.open();
                createCustomRescalingTestSink.setProcessingTime(0L);
                createCustomRescalingTestSink.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
                TestUtils.checkLocalFs(newFolder, 1, 0);
                createCustomRescalingTestSink.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L));
                TestUtils.checkLocalFs(newFolder, 1, 0);
                createCustomRescalingTestSink.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 3L));
                TestUtils.checkLocalFs(newFolder, 2, 0);
                createCustomRescalingTestSink.setProcessingTime(7L);
                createCustomRescalingTestSink.processElement(new StreamRecord<>(Tuple2.of("test1", 4), 4L));
                TestUtils.checkLocalFs(newFolder, 3, 0);
                createCustomRescalingTestSink.setProcessingTime(20L);
                createCustomRescalingTestSink.processElement(new StreamRecord<>(Tuple2.of("test1", 5), 5L));
                TestUtils.checkLocalFs(newFolder, 4, 0);
                createCustomRescalingTestSink.snapshot(1L, 1L);
                TestUtils.checkLocalFs(newFolder, 4, 0);
                createCustomRescalingTestSink.notifyOfCompletedCheckpoint(1L);
                TestUtils.checkLocalFs(newFolder, 1, 3);
                if (createCustomRescalingTestSink != null) {
                    if (0 == 0) {
                        createCustomRescalingTestSink.close();
                        return;
                    }
                    try {
                        createCustomRescalingTestSink.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createCustomRescalingTestSink != null) {
                if (th != null) {
                    try {
                        createCustomRescalingTestSink.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createCustomRescalingTestSink.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testRollOnCheckpointPolicy() throws Exception {
        File newFolder = TEMP_FOLDER.newFolder();
        OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> createCustomRescalingTestSink = TestUtils.createCustomRescalingTestSink(newFolder, 1, 0, 10L, new TestUtils.TupleToStringBucketer(), new SimpleStringEncoder(), OnCheckpointRollingPolicy.build(), new DefaultBucketFactoryImpl());
        Throwable th = null;
        try {
            try {
                createCustomRescalingTestSink.setup();
                createCustomRescalingTestSink.open();
                createCustomRescalingTestSink.setProcessingTime(0L);
                createCustomRescalingTestSink.processElement(new StreamRecord<>(Tuple2.of("test2", 1), 1L));
                createCustomRescalingTestSink.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
                createCustomRescalingTestSink.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L));
                TestUtils.checkLocalFs(newFolder, 2, 0);
                createCustomRescalingTestSink.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 3L));
                TestUtils.checkLocalFs(newFolder, 2, 0);
                createCustomRescalingTestSink.snapshot(1L, 1L);
                for (File file : FileUtils.listFiles(newFolder, (String[]) null, true)) {
                    if (Objects.equals(file.getParentFile().getName(), "test1")) {
                        Assert.assertTrue(file.getName().contains(".part-0-1.inprogress."));
                    } else if (Objects.equals(file.getParentFile().getName(), "test2")) {
                        Assert.assertTrue(file.getName().contains(".part-0-0.inprogress."));
                    }
                }
                createCustomRescalingTestSink.processElement(new StreamRecord<>(Tuple2.of("test1", 4), 4L));
                TestUtils.checkLocalFs(newFolder, 3, 0);
                createCustomRescalingTestSink.notifyOfCompletedCheckpoint(1L);
                for (File file2 : FileUtils.listFiles(newFolder, (String[]) null, true)) {
                    if (Objects.equals(file2.getParentFile().getName(), "test1")) {
                        Assert.assertTrue(file2.getName().contains(".part-0-2.inprogress.") || file2.getName().equals("part-0-1"));
                    } else if (Objects.equals(file2.getParentFile().getName(), "test2")) {
                        Assert.assertEquals("part-0-0", file2.getName());
                    }
                }
                createCustomRescalingTestSink.processElement(new StreamRecord<>(Tuple2.of("test1", 5), 5L));
                createCustomRescalingTestSink.processElement(new StreamRecord<>(Tuple2.of("test1", 6), 6L));
                TestUtils.checkLocalFs(newFolder, 1, 2);
                createCustomRescalingTestSink.snapshot(2L, 2L);
                createCustomRescalingTestSink.processElement(new StreamRecord<>(Tuple2.of("test2", 7), 7L));
                TestUtils.checkLocalFs(newFolder, 2, 2);
                for (File file3 : FileUtils.listFiles(newFolder, (String[]) null, true)) {
                    if (Objects.equals(file3.getParentFile().getName(), "test1")) {
                        Assert.assertThat(file3.getName(), CoreMatchers.either(CoreMatchers.containsString(".part-0-2.inprogress.")).or(CoreMatchers.equalTo("part-0-1")));
                    } else if (Objects.equals(file3.getParentFile().getName(), "test2")) {
                        Assert.assertThat(file3.getName(), CoreMatchers.either(CoreMatchers.containsString(".part-0-3.inprogress.")).or(CoreMatchers.equalTo("part-0-0")));
                    }
                }
                createCustomRescalingTestSink.notifyOfCompletedCheckpoint(2L);
                TestUtils.checkLocalFs(newFolder, 1, 3);
                for (File file4 : FileUtils.listFiles(newFolder, (String[]) null, true)) {
                    if (Objects.equals(file4.getParentFile().getName(), "test1")) {
                        Assert.assertThat(file4.getName(), CoreMatchers.either(CoreMatchers.equalTo("part-0-2")).or(CoreMatchers.equalTo("part-0-1")));
                    } else if (Objects.equals(file4.getParentFile().getName(), "test2")) {
                        Assert.assertThat(file4.getName(), CoreMatchers.either(CoreMatchers.containsString(".part-0-3.inprogress.")).or(CoreMatchers.equalTo("part-0-0")));
                    }
                }
                if (createCustomRescalingTestSink != null) {
                    if (0 == 0) {
                        createCustomRescalingTestSink.close();
                        return;
                    }
                    try {
                        createCustomRescalingTestSink.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createCustomRescalingTestSink != null) {
                if (th != null) {
                    try {
                        createCustomRescalingTestSink.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createCustomRescalingTestSink.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testCustomRollingPolicy() throws Exception {
        File newFolder = TEMP_FOLDER.newFolder();
        OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> createCustomRescalingTestSink = TestUtils.createCustomRescalingTestSink(newFolder, 1, 0, 10L, new TestUtils.TupleToStringBucketer(), new SimpleStringEncoder(), new RollingPolicy<Tuple2<String, Integer>, String>() { // from class: org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicyTest.1
            private static final long serialVersionUID = 1;

            public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileInfo) {
                return true;
            }

            public boolean shouldRollOnEvent(PartFileInfo<String> partFileInfo, Tuple2<String, Integer> tuple2) throws IOException {
                return partFileInfo.getSize() > 12;
            }

            public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileInfo, long j) {
                return false;
            }

            public /* bridge */ /* synthetic */ boolean shouldRollOnEvent(PartFileInfo partFileInfo, Object obj) throws IOException {
                return shouldRollOnEvent((PartFileInfo<String>) partFileInfo, (Tuple2<String, Integer>) obj);
            }
        }, new DefaultBucketFactoryImpl());
        Throwable th = null;
        try {
            try {
                createCustomRescalingTestSink.setup();
                createCustomRescalingTestSink.open();
                createCustomRescalingTestSink.setProcessingTime(0L);
                createCustomRescalingTestSink.processElement(new StreamRecord<>(Tuple2.of("test2", 1), 1L));
                createCustomRescalingTestSink.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L));
                createCustomRescalingTestSink.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L));
                createCustomRescalingTestSink.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 2L));
                TestUtils.checkLocalFs(newFolder, 3, 0);
                createCustomRescalingTestSink.snapshot(1L, 1L);
                createCustomRescalingTestSink.processElement(new StreamRecord<>(Tuple2.of("test1", 4), 4L));
                createCustomRescalingTestSink.processElement(new StreamRecord<>(Tuple2.of("test1", 5), 5L));
                TestUtils.checkLocalFs(newFolder, 4, 0);
                createCustomRescalingTestSink.snapshot(2L, 2L);
                createCustomRescalingTestSink.notifyOfCompletedCheckpoint(1L);
                TestUtils.checkLocalFs(newFolder, 1, 3);
                if (createCustomRescalingTestSink != null) {
                    if (0 == 0) {
                        createCustomRescalingTestSink.close();
                        return;
                    }
                    try {
                        createCustomRescalingTestSink.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createCustomRescalingTestSink != null) {
                if (th != null) {
                    try {
                        createCustomRescalingTestSink.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createCustomRescalingTestSink.close();
                }
            }
            throw th4;
        }
    }
}
