/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.LifeCycleMonitor;
import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTaskTest;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTaskTest;
import org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper;
import org.apache.flink.streaming.runtime.tasks.StreamTaskFinalCheckpointsTest;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.runtime.tasks.TestFinishedOnRestoreStreamOperator;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.CompletingCheckpointResponder;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class MultipleInputStreamTaskChainedSourcesCheckpointingTest {
    private static final int MAX_STEPS = 100;
    @Parameter
    private boolean objectReuse;
    private final CheckpointMetaData metaData = new CheckpointMetaData(1L, System.currentTimeMillis());

    MultipleInputStreamTaskChainedSourcesCheckpointingTest() {
    }

    @Parameters(name="objectReuse = {0}")
    private static Collection<Boolean> parameters() {
        return Arrays.asList(true, false);
    }

    @TestTemplate
    void testSourceCheckpointFirst() throws Exception {
        try (StreamTaskMailboxTestHarness<String> testHarness = MultipleInputStreamTaskTest.buildTestHarness(this.objectReuse);){
            testHarness.setAutoProcess(false);
            ArrayDeque<StreamRecord> expectedOutput = new ArrayDeque<StreamRecord>();
            CheckpointBarrier barrier = this.createBarrier(testHarness);
            this.addRecordsAndBarriers(testHarness, barrier);
            CompletableFuture checkpointFuture = testHarness.getStreamTask().triggerCheckpointAsync(this.metaData, barrier.getCheckpointOptions());
            this.processSingleStepUntil(testHarness, checkpointFuture::isDone);
            expectedOutput.add(new StreamRecord((Object)"44", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"44", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"47.0", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"47.0", Long.MIN_VALUE));
            ArrayList<Object> actualOutput = new ArrayList<Object>(testHarness.getOutput());
            Assertions.assertThat(actualOutput.subList(0, expectedOutput.size())).containsExactlyInAnyOrderElementsOf(expectedOutput);
            Assertions.assertThat((Object)actualOutput.get(expectedOutput.size())).isEqualTo((Object)barrier);
        }
    }

    @TestTemplate
    void testSourceCheckpointFirstUnaligned() throws Exception {
        try (StreamTaskMailboxTestHarness<String> testHarness = MultipleInputStreamTaskTest.buildTestHarness(true, this.objectReuse);){
            testHarness.setAutoProcess(false);
            ArrayDeque<StreamRecord> expectedOutput = new ArrayDeque<StreamRecord>();
            this.addRecords(testHarness);
            CheckpointBarrier barrier = this.createBarrier(testHarness);
            CompletableFuture checkpointFuture = testHarness.getStreamTask().triggerCheckpointAsync(this.metaData, barrier.getCheckpointOptions());
            this.processSingleStepUntil(testHarness, checkpointFuture::isDone);
            Assertions.assertThat(testHarness.getOutput()).containsExactly(new Object[]{barrier});
            testHarness.processAll();
            expectedOutput.add(new StreamRecord((Object)"44", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"44", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"47.0", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"47.0", Long.MIN_VALUE));
            ArrayList<Object> actualOutput = new ArrayList<Object>(testHarness.getOutput());
            Assertions.assertThat(actualOutput.subList(1, expectedOutput.size() + 1)).containsExactlyInAnyOrderElementsOf(expectedOutput);
        }
    }

    @TestTemplate
    void testSourceCheckpointLast() throws Exception {
        try (StreamTaskMailboxTestHarness<String> testHarness = MultipleInputStreamTaskTest.buildTestHarness(this.objectReuse);){
            testHarness.setAutoProcess(false);
            ArrayDeque<StreamRecord> expectedOutput = new ArrayDeque<StreamRecord>();
            CheckpointBarrier barrier = this.createBarrier(testHarness);
            this.addRecordsAndBarriers(testHarness, barrier);
            testHarness.processAll();
            CompletableFuture checkpointFuture = testHarness.getStreamTask().triggerCheckpointAsync(this.metaData, barrier.getCheckpointOptions());
            this.processSingleStepUntil(testHarness, checkpointFuture::isDone);
            expectedOutput.add(new StreamRecord((Object)"42", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"42", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"42", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"44", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"44", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"47.0", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"47.0", Long.MIN_VALUE));
            ArrayList<Object> actualOutput = new ArrayList<Object>(testHarness.getOutput());
            Assertions.assertThat(actualOutput.subList(0, expectedOutput.size())).containsExactlyInAnyOrderElementsOf(expectedOutput);
            Assertions.assertThat((Object)actualOutput.get(expectedOutput.size())).isEqualTo((Object)barrier);
        }
    }

    @TestTemplate
    void testSourceCheckpointLastUnaligned() throws Exception {
        boolean unaligned = true;
        try (StreamTaskMailboxTestHarness<String> testHarness = MultipleInputStreamTaskTest.buildTestHarness(unaligned, this.objectReuse);){
            testHarness.setAutoProcess(false);
            ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
            this.addNetworkRecords(testHarness);
            CheckpointBarrier barrier = this.createBarrier(testHarness);
            this.addBarriers(testHarness, barrier);
            testHarness.processAll();
            MultipleInputStreamTaskTest.addSourceRecords(testHarness, 1, 1337, 1337, 1337);
            testHarness.processAll();
            expectedOutput.add(new StreamRecord((Object)"44", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"44", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"47.0", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"47.0", Long.MIN_VALUE));
            expectedOutput.add(barrier);
            Assertions.assertThat(testHarness.getOutput()).containsExactlyInAnyOrderElementsOf(expectedOutput);
        }
    }

    @TestTemplate
    void testStopWithSavepointDrainWaitsForSourcesFinish() throws Exception {
        try (StreamTaskMailboxTestHarness<String> testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).setCollectNetworkEvents().modifyExecutionConfig(MultipleInputStreamTaskTest.applyObjectReuse(this.objectReuse)).modifyStreamConfig(config -> config.setCheckpointingEnabled(true)).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addSourceInput(new SourceOperatorFactory((Source)new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 1), WatermarkStrategy.noWatermarks()), BasicTypeInfo.INT_TYPE_INFO).addSourceInput(new SourceOperatorFactory((Source)new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 1), WatermarkStrategy.noWatermarks()), BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MultipleInputStreamTaskTest.MapToStringMultipleInputOperatorFactory(4, true)).build();){
            testHarness.setAutoProcess(false);
            ArrayDeque<StreamRecord> expectedOutput = new ArrayDeque<StreamRecord>();
            CheckpointBarrier barrier = this.createStopWithSavepointDrainBarrier();
            testHarness.processElement(new StreamRecord((Object)"44", Long.MIN_VALUE), 0);
            testHarness.processEvent((AbstractEvent)new EndOfData(StopMode.DRAIN), 0);
            testHarness.processEvent((AbstractEvent)barrier, 0);
            testHarness.processElement(new StreamRecord((Object)47.0, Long.MIN_VALUE), 1);
            testHarness.processEvent((AbstractEvent)new EndOfData(StopMode.DRAIN), 1);
            testHarness.processEvent((AbstractEvent)barrier, 1);
            MultipleInputStreamTaskTest.addSourceRecords(testHarness, 1, Boundedness.CONTINUOUS_UNBOUNDED, 1, 2);
            MultipleInputStreamTaskTest.addSourceRecords(testHarness, 2, Boundedness.CONTINUOUS_UNBOUNDED, 3, 4);
            testHarness.processAll();
            CompletableFuture checkpointFuture = testHarness.getStreamTask().triggerCheckpointAsync(this.metaData, barrier.getCheckpointOptions());
            this.processSingleStepUntil(testHarness, checkpointFuture::isDone);
            expectedOutput.add(new StreamRecord((Object)"3", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"47.0", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"44", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"1", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"4", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"2", Long.MIN_VALUE));
            ArrayList<Object> actualOutput = new ArrayList<Object>(testHarness.getOutput());
            Assertions.assertThat(actualOutput.subList(0, expectedOutput.size())).containsExactlyInAnyOrderElementsOf(expectedOutput);
            Assertions.assertThat(actualOutput.subList(actualOutput.size() - 3, actualOutput.size())).containsExactly(new Object[]{new StreamRecord((Object)"FINISH"), new EndOfData(StopMode.DRAIN), barrier});
        }
    }

    @TestTemplate
    void testOnlyOneSource() throws Exception {
        try (StreamTaskMailboxTestHarness<String> testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyExecutionConfig(MultipleInputStreamTaskTest.applyObjectReuse(this.objectReuse)).addSourceInput(new SourceOperatorFactory((Source)new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks()), BasicTypeInfo.INT_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MultipleInputStreamTaskTest.MapToStringMultipleInputOperatorFactory(1)).build();){
            testHarness.setAutoProcess(false);
            ArrayDeque<StreamRecord> expectedOutput = new ArrayDeque<StreamRecord>();
            MultipleInputStreamTaskTest.addSourceRecords(testHarness, 0, 42, 43, 44);
            this.processSingleStepUntil(testHarness, () -> !testHarness.getOutput().isEmpty());
            expectedOutput.add(new StreamRecord((Object)"42", Long.MIN_VALUE));
            CheckpointBarrier barrier = this.createBarrier(testHarness);
            CompletableFuture checkpointFuture = testHarness.getStreamTask().triggerCheckpointAsync(this.metaData, barrier.getCheckpointOptions());
            this.processSingleStepUntil(testHarness, checkpointFuture::isDone);
            ArrayList<Object> actualOutput = new ArrayList<Object>(testHarness.getOutput());
            Assertions.assertThat(actualOutput.subList(0, expectedOutput.size())).containsExactlyInAnyOrderElementsOf(expectedOutput);
            Assertions.assertThat((Object)actualOutput.get(expectedOutput.size())).isEqualTo((Object)barrier);
        }
    }

    @TestTemplate
    void testTriggerAlignedNoTimeoutCheckpointWithFinishedChannelsAndSourceChain() throws Exception {
        this.testTriggerCheckpointWithFinishedChannelsAndSourceChain(CheckpointOptions.alignedNoTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
    }

    @TestTemplate
    void testTriggerUnalignedCheckpointWithFinishedChannelsAndSourceChain() throws Exception {
        this.testTriggerCheckpointWithFinishedChannelsAndSourceChain(CheckpointOptions.unaligned((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
    }

    @TestTemplate
    void testTriggerAlignedWithTimeoutCheckpointWithFinishedChannelsAndSourceChain() throws Exception {
        this.testTriggerCheckpointWithFinishedChannelsAndSourceChain(CheckpointOptions.alignedWithTimeout((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (long)10L));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testTriggerCheckpointWithFinishedChannelsAndSourceChain(CheckpointOptions checkpointOptions) throws Exception {
        ResultPartition[] partitionWriters = new ResultPartition[2];
        try {
            for (int i = 0; i < partitionWriters.length; ++i) {
                partitionWriters[i] = PartitionTestUtils.createPartition((ResultPartitionType)ResultPartitionType.PIPELINED_BOUNDED);
                partitionWriters[i].setup();
            }
            CompletingCheckpointResponder checkpointResponder = new CompletingCheckpointResponder();
            try (StreamTaskMailboxTestHarness<String> testHarness = ((StreamTaskMailboxTestHarnessBuilder)new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyStreamConfig(config -> {
                config.setCheckpointingEnabled(true);
                config.setUnalignedCheckpointsEnabled(checkpointOptions.isUnalignedCheckpoint() || checkpointOptions.isTimeoutable());
            }).modifyExecutionConfig(MultipleInputStreamTaskTest.applyObjectReuse(this.objectReuse)).setCheckpointResponder(checkpointResponder).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addSourceInput(new SourceOperatorFactory((Source)new MultipleInputStreamTaskTest.LifeCycleTrackingMockSource(Boundedness.CONTINUOUS_UNBOUNDED, 1), WatermarkStrategy.noWatermarks()), BasicTypeInfo.INT_TYPE_INFO).addSourceInput(new SourceOperatorFactory((Source)new MultipleInputStreamTaskTest.LifeCycleTrackingMockSource(Boundedness.CONTINUOUS_UNBOUNDED, 1), WatermarkStrategy.noWatermarks()), BasicTypeInfo.INT_TYPE_INFO).addAdditionalOutput((ResultPartitionWriter[])partitionWriters).setupOperatorChain((StreamOperatorFactory<?>)new MultipleInputStreamTaskTest.MapToStringMultipleInputOperatorFactory(4)).finishForSingletonOperatorChain((TypeSerializer)StringSerializer.INSTANCE)).build();){
                checkpointResponder.setHandlers(arg_0 -> testHarness.streamTask.notifyCheckpointCompleteAsync(arg_0), (arg_0, arg_1) -> testHarness.streamTask.notifyCheckpointAbortAsync(arg_0, arg_1));
                testHarness.getStreamTask().getCheckpointBarrierHandler().get();
                CompletableFuture<Boolean> checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 2L, checkpointOptions);
                testHarness.processAll();
                testHarness.processEvent((AbstractEvent)new EndOfData(StopMode.DRAIN), 0, 0);
                testHarness.processEvent((AbstractEvent)new EndOfData(StopMode.DRAIN), 1, 0);
                testHarness.processEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, 0, 0);
                testHarness.processEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, 1, 0);
                testHarness.getTaskStateManager().getWaitForReportLatch().await();
                Assertions.assertThat((long)testHarness.getTaskStateManager().getReportedCheckpointId()).isEqualTo(2L);
                checkpointFuture = StreamTaskFinalCheckpointsTest.triggerCheckpoint(testHarness, 4L, checkpointOptions);
                checkpointFuture.thenAccept(ignored -> {
                    for (ResultPartition resultPartition : partitionWriters) {
                        resultPartition.onSubpartitionAllDataProcessed(0);
                    }
                });
                testHarness.processAll();
                testHarness.finishProcessing();
                Assertions.assertThat(checkpointFuture).isDone();
                testHarness.getTaskStateManager().getWaitForReportLatch().await();
                Assertions.assertThat((long)testHarness.getTaskStateManager().getReportedCheckpointId()).isEqualTo(4L);
                for (ResultPartition resultPartition : partitionWriters) {
                    Assertions.assertThat((int)resultPartition.getNumberOfQueuedBuffers()).isEqualTo(3);
                }
            }
        }
        finally {
            for (ResultPartition writer : partitionWriters) {
                if (writer == null) continue;
                writer.close();
            }
        }
    }

    @TestTemplate
    void testSkipExecutionsIfFinishedOnRestoreWithSourceChained() throws Exception {
        OperatorID firstSourceOperatorId = new OperatorID();
        OperatorID secondSourceOperatorId = new OperatorID();
        OperatorID nonSourceOperatorId = new OperatorID();
        ArrayList output = new ArrayList();
        try (StreamTaskMailboxTestHarness testHarness = ((StreamTaskMailboxTestHarnessBuilder)new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyStreamConfig(config -> config.setCheckpointingEnabled(true)).modifyExecutionConfig(MultipleInputStreamTaskTest.applyObjectReuse(this.objectReuse)).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).addAdditionalOutput(new ResultPartitionWriter[]{new RecordOrEventCollectingResultPartitionWriter<StreamElement>(output, (TypeSerializer)new StreamElementSerializer((TypeSerializer)IntSerializer.INSTANCE)){

            public void notifyEndOfData(StopMode mode) throws IOException {
                this.broadcastEvent((AbstractEvent)new EndOfData(mode), false);
            }
        }}).addSourceInput(firstSourceOperatorId, new SourceOperatorFactory((Source)new SourceOperatorStreamTaskTest.LifeCycleMonitorSource(Boundedness.CONTINUOUS_UNBOUNDED, 1), WatermarkStrategy.noWatermarks()), BasicTypeInfo.INT_TYPE_INFO).addSourceInput(secondSourceOperatorId, new SourceOperatorFactory((Source)new SourceOperatorStreamTaskTest.LifeCycleMonitorSource(Boundedness.CONTINUOUS_UNBOUNDED, 1), WatermarkStrategy.noWatermarks()), BasicTypeInfo.INT_TYPE_INFO).setTaskStateSnapshot(1L, TaskStateSnapshot.FINISHED_ON_RESTORE).setupOperatorChain(nonSourceOperatorId, (StreamOperatorFactory<?>)new LifeCycleMonitorMultipleInputOperatorFactory()).chain((OneInputStreamOperator)new TestFinishedOnRestoreStreamOperator(), (TypeSerializer)StringSerializer.INSTANCE).finish()).build();){
            testHarness.processElement(Watermark.MAX_WATERMARK);
            Assertions.assertThat(output).isEmpty();
            testHarness.waitForTaskCompletion();
            Assertions.assertThat(output).containsExactly(new Object[]{Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN)});
            for (StreamOperatorWrapper wrapper : testHarness.getStreamTask().operatorChain.getAllOperators()) {
                if (!(wrapper.getStreamOperator() instanceof SourceOperator)) continue;
                SourceOperatorStreamTaskTest.LifeCycleMonitorSourceReader sourceReader = (SourceOperatorStreamTaskTest.LifeCycleMonitorSourceReader)((SourceOperator)wrapper.getStreamOperator()).getSourceReader();
                sourceReader.getLifeCycleMonitor().assertCallTimes(0, LifeCycleMonitor.LifeCyclePhase.values());
            }
        }
    }

    private void addRecordsAndBarriers(StreamTaskMailboxTestHarness<String> testHarness, CheckpointBarrier checkpointBarrier) throws Exception {
        this.addRecords(testHarness);
        this.addBarriers(testHarness, checkpointBarrier);
    }

    private CheckpointBarrier createStopWithSavepointDrainBarrier() {
        CheckpointOptions checkpointOptions = CheckpointOptions.alignedNoTimeout((SnapshotType)SavepointType.terminate((SavepointFormatType)SavepointFormatType.CANONICAL), (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault());
        return new CheckpointBarrier(this.metaData.getCheckpointId(), this.metaData.getTimestamp(), checkpointOptions);
    }

    private CheckpointBarrier createBarrier(StreamTaskMailboxTestHarness<String> testHarness) {
        StreamConfig config = testHarness.getStreamTask().getConfiguration();
        CheckpointOptions checkpointOptions = CheckpointOptions.forConfig((SnapshotType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault(), (boolean)config.isExactlyOnceCheckpointMode(), (boolean)config.isUnalignedCheckpointsEnabled(), (long)config.getAlignedCheckpointTimeout().toMillis());
        return new CheckpointBarrier(this.metaData.getCheckpointId(), this.metaData.getTimestamp(), checkpointOptions);
    }

    private void addBarriers(StreamTaskMailboxTestHarness<String> testHarness, CheckpointBarrier checkpointBarrier) throws Exception {
        testHarness.processEvent((AbstractEvent)checkpointBarrier, 0);
        testHarness.processEvent((AbstractEvent)checkpointBarrier, 1);
    }

    private void addRecords(StreamTaskMailboxTestHarness<String> testHarness) throws Exception {
        MultipleInputStreamTaskTest.addSourceRecords(testHarness, 1, 42, 42, 42);
        this.addNetworkRecords(testHarness);
    }

    private void addNetworkRecords(StreamTaskMailboxTestHarness<String> testHarness) throws Exception {
        testHarness.processElement(new StreamRecord((Object)"44", Long.MIN_VALUE), 0);
        testHarness.processElement(new StreamRecord((Object)"44", Long.MIN_VALUE), 0);
        testHarness.processElement(new StreamRecord((Object)47.0, Long.MIN_VALUE), 1);
        testHarness.processElement(new StreamRecord((Object)47.0, Long.MIN_VALUE), 1);
    }

    private void processSingleStepUntil(StreamTaskMailboxTestHarness<String> testHarness, Supplier<Boolean> condition) throws Exception {
        Assertions.assertThat((Boolean)condition.get()).isFalse();
        for (int i = 0; i < 100 && !condition.get().booleanValue(); ++i) {
            testHarness.processSingleStep();
        }
        Assertions.assertThat((Boolean)condition.get()).isTrue();
    }

    static class LifeCycleMonitorMultipleInputOperatorFactory
    extends AbstractStreamOperatorFactory<String> {
        LifeCycleMonitorMultipleInputOperatorFactory() {
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
            return (T)((Object)new LifeCycleMonitorMultipleInputOperator());
        }

        public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) {
            return LifeCycleMonitorMultipleInputOperator.class;
        }
    }

    static class LifeCycleMonitorMultipleInputOperator
    extends TestFinishedOnRestoreStreamOperator
    implements MultipleInputStreamOperator<String> {
        public List<Input> getInputs() {
            ArrayList<Input> inputs = new ArrayList<Input>();
            inputs.add(new TestFinishedOnRestoreInput());
            inputs.add(new TestFinishedOnRestoreInput());
            inputs.add(new TestFinishedOnRestoreInput());
            return inputs;
        }

        private static class TestFinishedOnRestoreInput
        implements Input {
            private TestFinishedOnRestoreInput() {
            }

            public void processElement(StreamRecord element) throws Exception {
                throw new IllegalStateException("This should never be called");
            }

            public void processWatermark(Watermark mark) throws Exception {
                throw new IllegalStateException("This should never be called");
            }

            public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
                throw new IllegalStateException("This should never be called");
            }

            public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
                throw new IllegalStateException("This should never be called");
            }

            public void setKeyContextElement(StreamRecord record) throws Exception {
                throw new IllegalStateException("This should never be called");
            }
        }
    }
}

