/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.Serializable;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class KeyedProcessOperatorTest {
    KeyedProcessOperatorTest() {
    }

    @Test
    void testKeyQuerying() throws Exception {
        class KeyQueryingProcessFunction
        extends KeyedProcessFunction<Integer, Tuple2<Integer, String>, String> {
            KeyQueryingProcessFunction() {
            }

            public void processElement(Tuple2<Integer, String> value, KeyedProcessFunction.Context ctx, Collector<String> out) throws Exception {
                ((AbstractIntegerAssert)Assertions.assertThat((Integer)((Integer)ctx.getCurrentKey())).as("Did not get expected key.", new Object[0])).isEqualTo(value.f0);
                out.collect((Object)((String)value.f1));
            }
        }
        KeyedProcessOperator operator = new KeyedProcessOperator((KeyedProcessFunction)new KeyQueryingProcessFunction());
        try (KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, (KeySelector & Serializable)in -> (Integer)in.f0, BasicTypeInfo.INT_TYPE_INFO);){
            testHarness.setup();
            testHarness.open();
            testHarness.processElement(new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5"), 12L));
            testHarness.processElement(new StreamRecord((Object)Tuple2.of((Object)42, (Object)"42"), 13L));
            ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
            expectedOutput.add(new StreamRecord((Object)"5", 12L));
            expectedOutput.add(new StreamRecord((Object)"42", 13L));
            TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        }
    }

    @Test
    void testTimestampAndWatermarkQuerying() throws Exception {
        KeyedProcessOperator operator = new KeyedProcessOperator((KeyedProcessFunction)new QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processWatermark(new Watermark(17L));
        testHarness.processElement(new StreamRecord((Object)5, 12L));
        testHarness.processWatermark(new Watermark(42L));
        testHarness.processElement(new StreamRecord((Object)6, 13L));
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new Watermark(17L));
        expectedOutput.add(new StreamRecord((Object)"5TIME:17 TS:12", 12L));
        expectedOutput.add(new Watermark(42L));
        expectedOutput.add(new StreamRecord((Object)"6TIME:42 TS:13", 13L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testTimestampAndProcessingTimeQuerying() throws Exception {
        KeyedProcessOperator operator = new KeyedProcessOperator((KeyedProcessFunction)new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.setProcessingTime(17L);
        testHarness.processElement(new StreamRecord((Object)5));
        testHarness.setProcessingTime(42L);
        testHarness.processElement(new StreamRecord((Object)6));
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)"5TIME:17 TS:null"));
        expectedOutput.add(new StreamRecord((Object)"6TIME:42 TS:null"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testEventTimeTimers() throws Exception {
        int expectedKey = 17;
        KeyedProcessOperator operator = new KeyedProcessOperator((KeyedProcessFunction)new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME, 17));
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processWatermark(new Watermark(0L));
        testHarness.processElement(new StreamRecord((Object)17, 42L));
        testHarness.processWatermark(new Watermark(5L));
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new Watermark(0L));
        expectedOutput.add(new StreamRecord((Object)17, 42L));
        expectedOutput.add(new StreamRecord((Object)1777, 5L));
        expectedOutput.add(new Watermark(5L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testProcessingTimeTimers() throws Exception {
        int expectedKey = 17;
        KeyedProcessOperator operator = new KeyedProcessOperator((KeyedProcessFunction)new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME, 17));
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)17));
        testHarness.setProcessingTime(5L);
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)17));
        expectedOutput.add(new StreamRecord((Object)1777));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testEventTimeTimerWithState() throws Exception {
        KeyedProcessOperator operator = new KeyedProcessOperator((KeyedProcessFunction)new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processWatermark(new Watermark(1L));
        testHarness.processElement(new StreamRecord((Object)17, 0L));
        testHarness.processElement(new StreamRecord((Object)13, 0L));
        testHarness.processWatermark(new Watermark(2L));
        testHarness.processElement(new StreamRecord((Object)42, 1L));
        testHarness.processElement(new StreamRecord((Object)13, 1L));
        testHarness.processWatermark(new Watermark(6L));
        testHarness.processWatermark(new Watermark(7L));
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new Watermark(1L));
        expectedOutput.add(new StreamRecord((Object)"INPUT:17", 0L));
        expectedOutput.add(new StreamRecord((Object)"INPUT:13", 0L));
        expectedOutput.add(new Watermark(2L));
        expectedOutput.add(new StreamRecord((Object)"INPUT:42", 1L));
        expectedOutput.add(new StreamRecord((Object)"STATE:17", 6L));
        expectedOutput.add(new Watermark(6L));
        expectedOutput.add(new StreamRecord((Object)"STATE:42", 7L));
        expectedOutput.add(new Watermark(7L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testProcessingTimeTimerWithState() throws Exception {
        KeyedProcessOperator operator = new KeyedProcessOperator((KeyedProcessFunction)new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.setProcessingTime(1L);
        testHarness.processElement(new StreamRecord((Object)17));
        testHarness.processElement(new StreamRecord((Object)13));
        testHarness.setProcessingTime(2L);
        testHarness.processElement(new StreamRecord((Object)13));
        testHarness.processElement(new StreamRecord((Object)42));
        testHarness.setProcessingTime(6L);
        testHarness.setProcessingTime(7L);
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)"INPUT:17"));
        expectedOutput.add(new StreamRecord((Object)"INPUT:13"));
        expectedOutput.add(new StreamRecord((Object)"INPUT:42"));
        expectedOutput.add(new StreamRecord((Object)"STATE:17"));
        expectedOutput.add(new StreamRecord((Object)"STATE:42"));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testSnapshotAndRestore() throws Exception {
        int expectedKey = 5;
        KeyedProcessOperator operator = new KeyedProcessOperator((KeyedProcessFunction)new BothTriggeringFlatMapFunction(5));
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)5, 12L));
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        operator = new KeyedProcessOperator((KeyedProcessFunction)new BothTriggeringFlatMapFunction(5));
        testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.setProcessingTime(5L);
        testHarness.processWatermark(new Watermark(6L));
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)"PROC:1777"));
        expectedOutput.add(new StreamRecord((Object)"EVENT:1777", 6L));
        expectedOutput.add(new Watermark(6L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testNullOutputTagRefusal() throws Exception {
        KeyedProcessOperator operator = new KeyedProcessOperator((KeyedProcessFunction)new NullOutputTagEmittingProcessFunction());
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.setProcessingTime(17L);
        try {
            Assertions.assertThatThrownBy(() -> testHarness.processElement(new StreamRecord((Object)5))).isInstanceOf(IllegalArgumentException.class);
        }
        finally {
            testHarness.close();
        }
    }

    @Test
    void testSideOutput() throws Exception {
        KeyedProcessOperator operator = new KeyedProcessOperator((KeyedProcessFunction)new SideOutputProcessFunction());
        KeyedOneInputStreamOperatorTestHarness testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, new IdentityKeySelector(), BasicTypeInfo.INT_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processElement(new StreamRecord((Object)42, 17L));
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)"IN:42", 17L));
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        ConcurrentLinkedQueue<StreamRecord> expectedIntSideOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedIntSideOutput.add(new StreamRecord((Object)42, 17L));
        ConcurrentLinkedQueue<StreamRecord<Integer>> intSideOutput = testHarness.getSideOutput(SideOutputProcessFunction.INTEGER_OUTPUT_TAG);
        TestHarnessUtil.assertOutputEquals("Side output was not correct.", expectedIntSideOutput, intSideOutput);
        ConcurrentLinkedQueue<StreamRecord> expectedLongSideOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedLongSideOutput.add(new StreamRecord((Object)42L, 17L));
        ConcurrentLinkedQueue<StreamRecord<Long>> longSideOutput = testHarness.getSideOutput(SideOutputProcessFunction.LONG_OUTPUT_TAG);
        TestHarnessUtil.assertOutputEquals("Side output was not correct.", expectedLongSideOutput, longSideOutput);
        testHarness.close();
    }

    private static class BothTriggeringFlatMapFunction
    extends KeyedProcessFunction<Integer, Integer, String> {
        private static final long serialVersionUID = 1L;
        private final Integer expectedKey;

        public BothTriggeringFlatMapFunction(Integer expectedKey) {
            this.expectedKey = expectedKey;
        }

        public void processElement(Integer value, KeyedProcessFunction.Context ctx, Collector<String> out) throws Exception {
            TimerService timerService = ctx.timerService();
            timerService.registerProcessingTimeTimer(3L);
            timerService.registerEventTimeTimer(4L);
            timerService.registerProcessingTimeTimer(5L);
            timerService.registerEventTimeTimer(6L);
            timerService.deleteProcessingTimeTimer(3L);
            timerService.deleteEventTimeTimer(4L);
        }

        public void onTimer(long timestamp, KeyedProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
            Assertions.assertThat((Integer)((Integer)ctx.getCurrentKey())).isEqualTo((Object)this.expectedKey);
            if (TimeDomain.EVENT_TIME.equals((Object)ctx.timeDomain())) {
                out.collect((Object)"EVENT:1777");
            } else {
                out.collect((Object)"PROC:1777");
            }
        }
    }

    private static class TriggeringStatefulFlatMapFunction
    extends KeyedProcessFunction<Integer, Integer, String> {
        private static final long serialVersionUID = 1L;
        private final ValueStateDescriptor<Integer> state = new ValueStateDescriptor("seen-element", (TypeSerializer)IntSerializer.INSTANCE);
        private final TimeDomain expectedTimeDomain;

        public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) {
            this.expectedTimeDomain = timeDomain;
        }

        public void processElement(Integer value, KeyedProcessFunction.Context ctx, Collector<String> out) throws Exception {
            TimerService timerService = ctx.timerService();
            ValueState state = this.getRuntimeContext().getState(this.state);
            if (state.value() == null) {
                out.collect((Object)("INPUT:" + value));
                state.update((Object)value);
                if (this.expectedTimeDomain.equals((Object)TimeDomain.EVENT_TIME)) {
                    timerService.registerEventTimeTimer(timerService.currentWatermark() + 5L);
                } else {
                    timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5L);
                }
            } else {
                state.clear();
                if (this.expectedTimeDomain.equals((Object)TimeDomain.EVENT_TIME)) {
                    timerService.deleteEventTimeTimer(timerService.currentWatermark() + 4L);
                } else {
                    timerService.deleteProcessingTimeTimer(timerService.currentProcessingTime() + 4L);
                }
            }
        }

        public void onTimer(long timestamp, KeyedProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
            Assertions.assertThat((Comparable)ctx.timeDomain()).isEqualTo((Object)this.expectedTimeDomain);
            out.collect((Object)("STATE:" + String.valueOf(this.getRuntimeContext().getState(this.state).value())));
        }
    }

    private static class TriggeringFlatMapFunction
    extends KeyedProcessFunction<Integer, Integer, Integer> {
        private static final long serialVersionUID = 1L;
        private final TimeDomain expectedTimeDomain;
        private final Integer expectedKey;

        public TriggeringFlatMapFunction(TimeDomain timeDomain, Integer expectedKey) {
            this.expectedTimeDomain = timeDomain;
            this.expectedKey = expectedKey;
        }

        public void processElement(Integer value, KeyedProcessFunction.Context ctx, Collector<Integer> out) throws Exception {
            out.collect((Object)value);
            if (this.expectedTimeDomain.equals((Object)TimeDomain.EVENT_TIME)) {
                ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5L);
            } else {
                ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5L);
            }
        }

        public void onTimer(long timestamp, KeyedProcessFunction.OnTimerContext ctx, Collector<Integer> out) throws Exception {
            Assertions.assertThat((Integer)((Integer)ctx.getCurrentKey())).isEqualTo((Object)this.expectedKey);
            Assertions.assertThat((Comparable)ctx.timeDomain()).isEqualTo((Object)this.expectedTimeDomain);
            out.collect((Object)1777);
        }
    }

    private static class QueryingFlatMapFunction
    extends KeyedProcessFunction<Integer, Integer, String> {
        private static final long serialVersionUID = 1L;
        private final TimeDomain expectedTimeDomain;

        public QueryingFlatMapFunction(TimeDomain timeDomain) {
            this.expectedTimeDomain = timeDomain;
        }

        public void processElement(Integer value, KeyedProcessFunction.Context ctx, Collector<String> out) throws Exception {
            if (this.expectedTimeDomain.equals((Object)TimeDomain.EVENT_TIME)) {
                out.collect((Object)(value + "TIME:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()));
            } else {
                out.collect((Object)(value + "TIME:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()));
            }
        }

        public void onTimer(long timestamp, KeyedProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
        }
    }

    private static class IdentityKeySelector<T>
    implements KeySelector<T, T> {
        private static final long serialVersionUID = 1L;

        private IdentityKeySelector() {
        }

        public T getKey(T value) throws Exception {
            return value;
        }
    }

    private static class SideOutputProcessFunction
    extends KeyedProcessFunction<Integer, Integer, String> {
        static final OutputTag<Integer> INTEGER_OUTPUT_TAG = new OutputTag<Integer>("int-out"){};
        static final OutputTag<Long> LONG_OUTPUT_TAG = new OutputTag<Long>("long-out"){};

        private SideOutputProcessFunction() {
        }

        public void processElement(Integer value, KeyedProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)("IN:" + value));
            ctx.output(INTEGER_OUTPUT_TAG, (Object)value);
            ctx.output(LONG_OUTPUT_TAG, (Object)value.longValue());
        }
    }

    private static class NullOutputTagEmittingProcessFunction
    extends KeyedProcessFunction<Integer, Integer, String> {
        private NullOutputTagEmittingProcessFunction() {
        }

        public void processElement(Integer value, KeyedProcessFunction.Context ctx, Collector<String> out) throws Exception {
            ctx.output(null, (Object)value);
        }
    }
}

