package org.apache.flink.streaming.api.operators;

import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.SerializedLambda;
import java.util.BitSet;
import java.util.Iterator;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl;
import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.class */
public class StreamOperatorSnapshotRestoreTest extends TestLogger {
    private static final int ONLY_JM_RECOVERY = 0;
    private static final int TM_AND_JM_RECOVERY = 1;
    private static final int TM_REMOVE_JM_RECOVERY = 2;
    private static final int JM_REMOVE_TM_RECOVERY = 3;
    private static final int MAX_PARALLELISM = 10;
    protected static TemporaryFolder temporaryFolder;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest$TestOneInputStreamOperator.class */
    public static class TestOneInputStreamOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
        private static final long serialVersionUID = -8942866418598856475L;
        private boolean verifyRestore;
        private ValueState<Integer> keyedState;
        private ListState<Integer> opState;

        public TestOneInputStreamOperator(boolean z) {
            this.verifyRestore = z;
        }

        public void processElement(StreamRecord<Integer> streamRecord) throws Exception {
            if (this.verifyRestore) {
                Assert.assertEquals(((Integer) streamRecord.getValue()).intValue() + StreamOperatorSnapshotRestoreTest.TM_AND_JM_RECOVERY, ((Integer) this.keyedState.value()).intValue());
            } else {
                this.keyedState.update(Integer.valueOf(((Integer) streamRecord.getValue()).intValue() + StreamOperatorSnapshotRestoreTest.TM_AND_JM_RECOVERY));
                this.opState.add(streamRecord.getValue());
            }
        }

        public void processWatermark(Watermark watermark) throws Exception {
        }

        public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
            KeyedStateCheckpointOutputStream rawKeyedOperatorStateOutput = stateSnapshotContext.getRawKeyedOperatorStateOutput();
            DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(rawKeyedOperatorStateOutput);
            int i = StreamOperatorSnapshotRestoreTest.ONLY_JM_RECOVERY;
            Iterator it = rawKeyedOperatorStateOutput.getKeyGroupList().iterator();
            while (it.hasNext()) {
                int intValue = ((Integer) it.next()).intValue();
                rawKeyedOperatorStateOutput.startNewKeyGroup(intValue);
                dataOutputViewStreamWrapper.writeInt(intValue + StreamOperatorSnapshotRestoreTest.TM_REMOVE_JM_RECOVERY);
                i += StreamOperatorSnapshotRestoreTest.TM_AND_JM_RECOVERY;
            }
            Assert.assertEquals(10L, i);
            OperatorStateCheckpointOutputStream rawOperatorStateOutput = stateSnapshotContext.getRawOperatorStateOutput();
            DataOutputViewStreamWrapper dataOutputViewStreamWrapper2 = new DataOutputViewStreamWrapper(rawOperatorStateOutput);
            for (int i2 = StreamOperatorSnapshotRestoreTest.ONLY_JM_RECOVERY; i2 < 13; i2 += StreamOperatorSnapshotRestoreTest.TM_AND_JM_RECOVERY) {
                rawOperatorStateOutput.startNewPartition();
                dataOutputViewStreamWrapper2.writeInt(42 + i2);
            }
        }

        public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
            Assert.assertEquals(Boolean.valueOf(this.verifyRestore), Boolean.valueOf(stateInitializationContext.isRestored()));
            this.keyedState = stateInitializationContext.getKeyedStateStore().getState(new ValueStateDescriptor("managed-keyed", Integer.class, Integer.valueOf(StreamOperatorSnapshotRestoreTest.ONLY_JM_RECOVERY)));
            this.opState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("managed-op-state", IntSerializer.INSTANCE));
            if (stateInitializationContext.isRestored()) {
                int i = StreamOperatorSnapshotRestoreTest.ONLY_JM_RECOVERY;
                Iterator it = stateInitializationContext.getRawKeyedStateInputs().iterator();
                while (it.hasNext()) {
                    InputStream stream = ((KeyGroupStatePartitionStreamProvider) it.next()).getStream();
                    Throwable th = StreamOperatorSnapshotRestoreTest.ONLY_JM_RECOVERY;
                    try {
                        try {
                            Assert.assertEquals(r0.getKeyGroupId() + StreamOperatorSnapshotRestoreTest.TM_REMOVE_JM_RECOVERY, new DataInputViewStreamWrapper(stream).readInt());
                            i += StreamOperatorSnapshotRestoreTest.TM_AND_JM_RECOVERY;
                            if (stream != null) {
                                if (th != null) {
                                    try {
                                        stream.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    stream.close();
                                }
                            }
                        } catch (Throwable th3) {
                            if (stream != null) {
                                if (th != null) {
                                    try {
                                        stream.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    stream.close();
                                }
                            }
                            throw th3;
                        }
                    } catch (Throwable th5) {
                        th = th5;
                        throw th5;
                    }
                }
                Assert.assertEquals(10L, i);
                BitSet bitSet = new BitSet(StreamOperatorSnapshotRestoreTest.MAX_PARALLELISM);
                Iterator it2 = ((Iterable) this.opState.get()).iterator();
                while (it2.hasNext()) {
                    bitSet.set(((Integer) it2.next()).intValue());
                }
                Assert.assertEquals(10L, bitSet.cardinality());
                BitSet bitSet2 = new BitSet(13);
                Iterator it3 = stateInitializationContext.getRawOperatorStateInputs().iterator();
                while (it3.hasNext()) {
                    InputStream stream2 = ((StatePartitionStreamProvider) it3.next()).getStream();
                    Throwable th6 = StreamOperatorSnapshotRestoreTest.ONLY_JM_RECOVERY;
                    try {
                        try {
                            bitSet2.set(new DataInputViewStreamWrapper(stream2).readInt() - 42);
                            if (stream2 != null) {
                                if (th6 != null) {
                                    try {
                                        stream2.close();
                                    } catch (Throwable th7) {
                                        th6.addSuppressed(th7);
                                    }
                                } else {
                                    stream2.close();
                                }
                            }
                        } catch (Throwable th8) {
                            th6 = th8;
                            throw th8;
                        }
                    } catch (Throwable th9) {
                        if (stream2 != null) {
                            if (th6 != null) {
                                try {
                                    stream2.close();
                                } catch (Throwable th10) {
                                    th6.addSuppressed(th10);
                                }
                            } else {
                                stream2.close();
                            }
                        }
                        throw th9;
                    }
                }
                Assert.assertEquals(13L, bitSet2.cardinality());
            }
        }
    }

    @BeforeClass
    public static void beforeClass() throws IOException {
        temporaryFolder = new TemporaryFolder();
        temporaryFolder.create();
    }

    @AfterClass
    public static void afterClass() {
        temporaryFolder.delete();
    }

    @Test
    public void testOperatorStatesSnapshotRestore() throws Exception {
        testOperatorStatesSnapshotRestoreInternal(ONLY_JM_RECOVERY);
    }

    @Test
    public void testOperatorStatesSnapshotRestoreWithLocalState() throws Exception {
        testOperatorStatesSnapshotRestoreInternal(TM_AND_JM_RECOVERY);
    }

    @Test
    public void testOperatorStatesSnapshotRestoreWithLocalStateDeletedJM() throws Exception {
        testOperatorStatesSnapshotRestoreInternal(TM_REMOVE_JM_RECOVERY);
    }

    @Test
    public void testOperatorStatesSnapshotRestoreWithLocalStateDeletedTM() throws Exception {
        testOperatorStatesSnapshotRestoreInternal(JM_REMOVE_TM_RECOVERY);
    }

    private void testOperatorStatesSnapshotRestoreInternal(int i) throws Exception {
        StateBackend createStateBackend = createStateBackend();
        TestOneInputStreamOperator testOneInputStreamOperator = new TestOneInputStreamOperator(false);
        JobID jobID = new JobID();
        JobVertexID jobVertexID = new JobVertexID();
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(testOneInputStreamOperator, num -> {
            return num;
        }, TypeInformation.of(Integer.class), new MockEnvironmentBuilder().setJobID(jobID).setJobVertexID(jobVertexID).setTaskName("test").setMemorySize(1048576L).setInputSplitProvider(new MockInputSplitProvider()).setBufferSize(StreamTaskTestHarness.DEFAULT_MEMORY_MANAGER_SIZE).setTaskStateManager(new TestTaskStateManager(new LocalRecoveryConfig(i != 0, new LocalRecoveryDirectoryProviderImpl(temporaryFolder.newFolder(), jobID, jobVertexID, ONLY_JM_RECOVERY)))).setMaxParallelism(MAX_PARALLELISM).setSubtaskIndex(ONLY_JM_RECOVERY).setUserCodeClassLoader(getClass().getClassLoader()).build());
        keyedOneInputStreamOperatorTestHarness.setStateBackend(createStateBackend);
        keyedOneInputStreamOperatorTestHarness.open();
        for (int i2 = ONLY_JM_RECOVERY; i2 < MAX_PARALLELISM; i2 += TM_AND_JM_RECOVERY) {
            keyedOneInputStreamOperatorTestHarness.processElement(new StreamRecord(Integer.valueOf(i2)));
        }
        OperatorSnapshotFinalizer snapshotWithLocalState = keyedOneInputStreamOperatorTestHarness.snapshotWithLocalState(1L, 1L);
        keyedOneInputStreamOperatorTestHarness.close();
        KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> keyedOneInputStreamOperatorTestHarness2 = new KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer>(new TestOneInputStreamOperator(true), num2 -> {
            return num2;
        }, TypeInformation.of(Integer.class), MAX_PARALLELISM, TM_AND_JM_RECOVERY, ONLY_JM_RECOVERY) { // from class: org.apache.flink.streaming.api.operators.StreamOperatorSnapshotRestoreTest.1
            @Override // org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness
            protected StreamTaskStateInitializer createStreamTaskStateManager(Environment environment, StateBackend stateBackend, ProcessingTimeService processingTimeService) {
                return new StreamTaskStateInitializerImpl(environment, stateBackend, processingTimeService) { // from class: org.apache.flink.streaming.api.operators.StreamOperatorSnapshotRestoreTest.1.1
                    protected <K> InternalTimeServiceManager<K> internalTimeServiceManager(AbstractKeyedStateBackend<K> abstractKeyedStateBackend, KeyContext keyContext, Iterable<KeyGroupStatePartitionStreamProvider> iterable) throws Exception {
                        return null;
                    }
                };
            }
        };
        keyedOneInputStreamOperatorTestHarness2.setStateBackend(createStateBackend);
        OperatorSubtaskState jobManagerOwnedState = snapshotWithLocalState.getJobManagerOwnedState();
        OperatorSubtaskState taskLocalState = snapshotWithLocalState.getTaskLocalState();
        Assert.assertTrue((i > 0) == (taskLocalState != null && taskLocalState.hasState()));
        if (i == TM_REMOVE_JM_RECOVERY) {
            jobManagerOwnedState.getManagedKeyedState().discardState();
        } else if (i == JM_REMOVE_TM_RECOVERY) {
            taskLocalState.getManagedKeyedState().discardState();
        }
        keyedOneInputStreamOperatorTestHarness2.initializeState(jobManagerOwnedState, taskLocalState);
        keyedOneInputStreamOperatorTestHarness2.open();
        for (int i3 = ONLY_JM_RECOVERY; i3 < MAX_PARALLELISM; i3 += TM_AND_JM_RECOVERY) {
            keyedOneInputStreamOperatorTestHarness2.processElement(new StreamRecord<>(Integer.valueOf(i3)));
        }
        keyedOneInputStreamOperatorTestHarness2.close();
    }

    protected StateBackend createStateBackend() throws IOException {
        return createStateBackendInternal();
    }

    protected final FsStateBackend createStateBackendInternal() throws IOException {
        return new FsStateBackend(temporaryFolder.newFolder().toURI());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1093124031:
                if (implMethodName.equals("lambda$testOperatorStatesSnapshotRestoreInternal$1f5675f4$1")) {
                    z = TM_AND_JM_RECOVERY;
                    break;
                }
                break;
            case 1132682549:
                if (implMethodName.equals("lambda$testOperatorStatesSnapshotRestoreInternal$2f7060f5$1")) {
                    z = ONLY_JM_RECOVERY;
                    break;
                }
                break;
        }
        switch (z) {
            case ONLY_JM_RECOVERY /* 0 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num2 -> {
                        return num2;
                    };
                }
                break;
            case TM_AND_JM_RECOVERY /* 1 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num -> {
                        return num;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
