/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.co;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class CoBroadcastWithNonKeyedOperatorTest {
    private static final MapStateDescriptor<String, Integer> STATE_DESCRIPTOR = new MapStateDescriptor("broadcast-state", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
    private static final MapStateDescriptor<Integer, String> STATE_DESCRIPTOR_A = new MapStateDescriptor("broadcast-state-A", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);

    CoBroadcastWithNonKeyedOperatorTest() {
    }

    @Test
    void testMultiStateSupport() throws Exception {
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = CoBroadcastWithNonKeyedOperatorTest.getInitializedTestHarness(new FunctionWithMultipleStates(), STATE_DESCRIPTOR, STATE_DESCRIPTOR_A);){
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)5, 12L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)6, 13L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"9", 15L));
            ConcurrentLinkedQueue<StreamRecord> expectedBr = new ConcurrentLinkedQueue<StreamRecord>();
            expectedBr.add(new StreamRecord((Object)"9:key.6->6", 15L));
            expectedBr.add(new StreamRecord((Object)"9:key.5->5", 15L));
            expectedBr.add(new StreamRecord((Object)"9:5->value.5", 15L));
            expectedBr.add(new StreamRecord((Object)"9:6->value.6", 15L));
            TestHarnessUtil.assertOutputEquals("Wrong Side Output", expectedBr, testHarness.getOutput());
        }
    }

    @Test
    void testBroadcastState() throws Exception {
        HashSet<String> keysToRegister = new HashSet<String>();
        keysToRegister.add("test1");
        keysToRegister.add("test2");
        keysToRegister.add("test3");
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = CoBroadcastWithNonKeyedOperatorTest.getInitializedTestHarness(new TestFunction(keysToRegister), STATE_DESCRIPTOR);){
            testHarness.processWatermark1(new Watermark(10L));
            testHarness.processWatermark2(new Watermark(10L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)5, 12L));
            testHarness.processWatermark1(new Watermark(40L));
            testHarness.processWatermark2(new Watermark(40L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"6", 13L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"6", 15L));
            testHarness.processWatermark1(new Watermark(50L));
            testHarness.processWatermark2(new Watermark(50L));
            ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
            expectedOutput.add(new Watermark(10L));
            expectedOutput.add(new StreamRecord((Object)"5WM:10 TS:12", 12L));
            expectedOutput.add(new Watermark(40L));
            expectedOutput.add(new StreamRecord((Object)"6WM:40 TS:13", 13L));
            expectedOutput.add(new StreamRecord((Object)"6WM:40 TS:15", 15L));
            expectedOutput.add(new Watermark(50L));
            TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
        }
    }

    @Test
    void testSideOutput() throws Exception {
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = CoBroadcastWithNonKeyedOperatorTest.getInitializedTestHarness(new FunctionWithSideOutput(), STATE_DESCRIPTOR);){
            testHarness.processWatermark1(new Watermark(10L));
            testHarness.processWatermark2(new Watermark(10L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)5, 12L));
            testHarness.processWatermark1(new Watermark(40L));
            testHarness.processWatermark2(new Watermark(40L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"6", 13L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"6", 15L));
            testHarness.processWatermark1(new Watermark(50L));
            testHarness.processWatermark2(new Watermark(50L));
            ConcurrentLinkedQueue<StreamRecord> expectedBr = new ConcurrentLinkedQueue<StreamRecord>();
            expectedBr.add(new StreamRecord((Object)"BR:5 WM:10 TS:12", 12L));
            ConcurrentLinkedQueue<StreamRecord> expectedNonBr = new ConcurrentLinkedQueue<StreamRecord>();
            expectedNonBr.add(new StreamRecord((Object)"NON-BR:6 WM:40 TS:13", 13L));
            expectedNonBr.add(new StreamRecord((Object)"NON-BR:6 WM:40 TS:15", 15L));
            ConcurrentLinkedQueue<StreamRecord<String>> brSideOutput = testHarness.getSideOutput(FunctionWithSideOutput.BROADCAST_TAG);
            ConcurrentLinkedQueue<StreamRecord<String>> nonBrSideOutput = testHarness.getSideOutput(FunctionWithSideOutput.NON_BROADCAST_TAG);
            TestHarnessUtil.assertOutputEquals("Wrong Side Output", expectedBr, brSideOutput);
            TestHarnessUtil.assertOutputEquals("Wrong Side Output", expectedNonBr, nonBrSideOutput);
        }
    }

    @Test
    void testScaleUp() throws Exception {
        OperatorSubtaskState mergedSnapshot;
        HashSet<String> keysToRegister = new HashSet<String>();
        keysToRegister.add("test1");
        keysToRegister.add("test2");
        keysToRegister.add("test3");
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness1 = CoBroadcastWithNonKeyedOperatorTest.getInitializedTestHarness(new TestFunctionWithOutput(keysToRegister), 10, 2, 0, STATE_DESCRIPTOR);
             TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness2 = CoBroadcastWithNonKeyedOperatorTest.getInitializedTestHarness(new TestFunctionWithOutput(keysToRegister), 10, 2, 1, STATE_DESCRIPTOR);){
            testHarness1.processElement2((StreamRecord<Integer>)new StreamRecord((Object)3));
            testHarness2.processElement2((StreamRecord<Integer>)new StreamRecord((Object)3));
            mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(testHarness1.snapshot(0L, 0L), testHarness2.snapshot(0L, 0L));
        }
        HashSet<String> expected = new HashSet<String>(3);
        expected.add("test1=3");
        expected.add("test2=3");
        expected.add("test3=3");
        OperatorSubtaskState initState1 = CoBroadcastWithNonKeyedOperatorTest.repartitionInitState(mergedSnapshot, 10, 2, 3, 0);
        OperatorSubtaskState initState2 = CoBroadcastWithNonKeyedOperatorTest.repartitionInitState(mergedSnapshot, 10, 2, 3, 1);
        OperatorSubtaskState initState3 = CoBroadcastWithNonKeyedOperatorTest.repartitionInitState(mergedSnapshot, 10, 2, 3, 2);
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness1 = CoBroadcastWithNonKeyedOperatorTest.getInitializedTestHarness(new TestFunctionWithOutput(keysToRegister), 10, 3, 0, initState1, STATE_DESCRIPTOR);
             TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness2 = CoBroadcastWithNonKeyedOperatorTest.getInitializedTestHarness(new TestFunctionWithOutput(keysToRegister), 10, 3, 1, initState2, STATE_DESCRIPTOR);
             TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness3 = CoBroadcastWithNonKeyedOperatorTest.getInitializedTestHarness(new TestFunctionWithOutput(keysToRegister), 10, 3, 2, initState3, STATE_DESCRIPTOR);){
            StreamRecord rec;
            testHarness1.processElement1((StreamRecord<String>)new StreamRecord((Object)"trigger"));
            testHarness2.processElement1((StreamRecord<String>)new StreamRecord((Object)"trigger"));
            testHarness3.processElement1((StreamRecord<String>)new StreamRecord((Object)"trigger"));
            ConcurrentLinkedQueue<Object> output1 = testHarness1.getOutput();
            ConcurrentLinkedQueue<Object> output2 = testHarness2.getOutput();
            ConcurrentLinkedQueue<Object> output3 = testHarness3.getOutput();
            Assertions.assertThat(output1).hasSameSizeAs(expected);
            for (Object e : output1) {
                rec = (StreamRecord)e;
                Assertions.assertThat((String)((String)rec.getValue())).isIn(expected);
            }
            Assertions.assertThat(output2).hasSameSizeAs(expected);
            for (Object e : output2) {
                rec = (StreamRecord)e;
                Assertions.assertThat((String)((String)rec.getValue())).isIn(expected);
            }
            Assertions.assertThat(output3).hasSameSizeAs(expected);
            for (Object e : output3) {
                rec = (StreamRecord)e;
                Assertions.assertThat((String)((String)rec.getValue())).isIn(expected);
            }
        }
    }

    @Test
    void testScaleDown() throws Exception {
        OperatorSubtaskState mergedSnapshot;
        HashSet<String> keysToRegister = new HashSet<String>();
        keysToRegister.add("test1");
        keysToRegister.add("test2");
        keysToRegister.add("test3");
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness1 = CoBroadcastWithNonKeyedOperatorTest.getInitializedTestHarness(new TestFunctionWithOutput(keysToRegister), 10, 3, 0, STATE_DESCRIPTOR);
             TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness2 = CoBroadcastWithNonKeyedOperatorTest.getInitializedTestHarness(new TestFunctionWithOutput(keysToRegister), 10, 3, 1, STATE_DESCRIPTOR);
             TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness3 = CoBroadcastWithNonKeyedOperatorTest.getInitializedTestHarness(new TestFunctionWithOutput(keysToRegister), 10, 3, 2, STATE_DESCRIPTOR);){
            testHarness1.processElement2((StreamRecord<Integer>)new StreamRecord((Object)3));
            testHarness2.processElement2((StreamRecord<Integer>)new StreamRecord((Object)3));
            testHarness3.processElement2((StreamRecord<Integer>)new StreamRecord((Object)3));
            mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(testHarness1.snapshot(0L, 0L), testHarness2.snapshot(0L, 0L), testHarness3.snapshot(0L, 0L));
        }
        HashSet<String> expected = new HashSet<String>(3);
        expected.add("test1=3");
        expected.add("test2=3");
        expected.add("test3=3");
        OperatorSubtaskState initState1 = CoBroadcastWithNonKeyedOperatorTest.repartitionInitState(mergedSnapshot, 10, 3, 2, 0);
        OperatorSubtaskState initState2 = CoBroadcastWithNonKeyedOperatorTest.repartitionInitState(mergedSnapshot, 10, 3, 2, 1);
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness1 = CoBroadcastWithNonKeyedOperatorTest.getInitializedTestHarness(new TestFunctionWithOutput(keysToRegister), 10, 2, 0, initState1, STATE_DESCRIPTOR);
             TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness2 = CoBroadcastWithNonKeyedOperatorTest.getInitializedTestHarness(new TestFunctionWithOutput(keysToRegister), 10, 2, 1, initState2, STATE_DESCRIPTOR);){
            StreamRecord rec;
            testHarness1.processElement1((StreamRecord<String>)new StreamRecord((Object)"trigger"));
            testHarness2.processElement1((StreamRecord<String>)new StreamRecord((Object)"trigger"));
            ConcurrentLinkedQueue<Object> output1 = testHarness1.getOutput();
            ConcurrentLinkedQueue<Object> output2 = testHarness2.getOutput();
            Assertions.assertThat(output1).hasSameSizeAs(expected);
            for (Object e : output1) {
                rec = (StreamRecord)e;
                Assertions.assertThat((String)((String)rec.getValue())).isIn(expected);
            }
            Assertions.assertThat(output2).hasSameSizeAs(expected);
            for (Object e : output2) {
                rec = (StreamRecord)e;
                Assertions.assertThat((String)((String)rec.getValue())).isIn(expected);
            }
        }
    }

    @Test
    void testNoKeyedStateOnBroadcastSide() throws Exception {
        final ValueStateDescriptor valueState = new ValueStateDescriptor("any", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = CoBroadcastWithNonKeyedOperatorTest.getInitializedTestHarness(new BroadcastProcessFunction<String, Integer, String>(){
            private static final long serialVersionUID = -1725365436500098384L;

            public void processBroadcastElement(Integer value, BroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
                ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.getRuntimeContext().getState(valueState).value()).isInstanceOf(NullPointerException.class)).hasMessage(String.format("Keyed state '%s' with type %s can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.", valueState.getName(), valueState.getType()));
            }

            public void processElement(String value, BroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
            }
        }, new MapStateDescriptor[0]);){
            testHarness.processWatermark1(new Watermark(10L));
            testHarness.processWatermark2(new Watermark(10L));
            testHarness.processElement2((StreamRecord<Integer>)new StreamRecord((Object)5, 12L));
        }
    }

    @Test
    void testNoKeyedStateOnNonBroadcastSide() throws Exception {
        boolean exceptionThrown = false;
        final ValueStateDescriptor valueState = new ValueStateDescriptor("any", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        try (TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = CoBroadcastWithNonKeyedOperatorTest.getInitializedTestHarness(new BroadcastProcessFunction<String, Integer, String>(){
            private static final long serialVersionUID = -1725365436500098384L;

            public void processBroadcastElement(Integer value, BroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
            }

            public void processElement(String value, BroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
                ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.getRuntimeContext().getState(valueState).value()).isInstanceOf(NullPointerException.class)).hasMessage(String.format("Keyed state '%s' with type %s can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.", valueState.getName(), valueState.getType()));
            }
        }, new MapStateDescriptor[0]);){
            testHarness.processWatermark1(new Watermark(10L));
            testHarness.processWatermark2(new Watermark(10L));
            testHarness.processElement1((StreamRecord<String>)new StreamRecord((Object)"5", 12L));
        }
    }

    private static <IN1, IN2, OUT> TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> getInitializedTestHarness(BroadcastProcessFunction<IN1, IN2, OUT> function, MapStateDescriptor<?, ?> ... descriptors) throws Exception {
        return CoBroadcastWithNonKeyedOperatorTest.getInitializedTestHarness(function, 1, 1, 0, descriptors);
    }

    private static <IN1, IN2, OUT> TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> getInitializedTestHarness(BroadcastProcessFunction<IN1, IN2, OUT> function, int maxParallelism, int numTasks, int taskIdx, MapStateDescriptor<?, ?> ... descriptors) throws Exception {
        return CoBroadcastWithNonKeyedOperatorTest.getInitializedTestHarness(function, maxParallelism, numTasks, taskIdx, null, descriptors);
    }

    private static OperatorSubtaskState repartitionInitState(OperatorSubtaskState initState, int numKeyGroups, int oldParallelism, int newParallelism, int subtaskIndex) {
        return AbstractStreamOperatorTestHarness.repartitionOperatorState(initState, numKeyGroups, oldParallelism, newParallelism, subtaskIndex);
    }

    private static <IN1, IN2, OUT> TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> getInitializedTestHarness(BroadcastProcessFunction<IN1, IN2, OUT> function, int maxParallelism, int numTasks, int taskIdx, OperatorSubtaskState initState, MapStateDescriptor<?, ?> ... descriptors) throws Exception {
        TwoInputStreamOperatorTestHarness testHarness = new TwoInputStreamOperatorTestHarness(new CoBroadcastWithNonKeyedOperator((BroadcastProcessFunction)Preconditions.checkNotNull(function), Arrays.asList(descriptors)), maxParallelism, numTasks, taskIdx);
        testHarness.setup();
        testHarness.initializeState(initState);
        testHarness.open();
        return testHarness;
    }

    private static class TestFunctionWithOutput
    extends BroadcastProcessFunction<String, Integer, String> {
        private static final long serialVersionUID = 7496674620398203933L;
        private final Set<String> keysToRegister;

        TestFunctionWithOutput(Set<String> keysToRegister) {
            this.keysToRegister = (Set)Preconditions.checkNotNull(keysToRegister);
        }

        public void processBroadcastElement(Integer value, BroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
            for (String k : this.keysToRegister) {
                ctx.getBroadcastState(STATE_DESCRIPTOR).put((Object)k, (Object)value);
            }
        }

        public void processElement(String value, BroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
            for (Map.Entry entry : ctx.getBroadcastState(STATE_DESCRIPTOR).immutableEntries()) {
                out.collect((Object)entry.toString());
            }
        }
    }

    private static class FunctionWithSideOutput
    extends BroadcastProcessFunction<String, Integer, String> {
        private static final long serialVersionUID = 7496674620398203933L;
        static final OutputTag<String> BROADCAST_TAG = new OutputTag<String>("br-out"){
            private static final long serialVersionUID = 8037335313997479800L;
        };
        static final OutputTag<String> NON_BROADCAST_TAG = new OutputTag<String>("non-br-out"){
            private static final long serialVersionUID = -1092362442658548175L;
        };

        private FunctionWithSideOutput() {
        }

        public void processBroadcastElement(Integer value, BroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
            ctx.output(BROADCAST_TAG, (Object)("BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp()));
        }

        public void processElement(String value, BroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
            ctx.output(NON_BROADCAST_TAG, (Object)("NON-BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp()));
        }
    }

    private static class TestFunction
    extends BroadcastProcessFunction<String, Integer, String> {
        private static final long serialVersionUID = 7496674620398203933L;
        private final Set<String> keysToRegister;

        TestFunction(Set<String> keysToRegister) {
            this.keysToRegister = (Set)Preconditions.checkNotNull(keysToRegister);
        }

        public void processBroadcastElement(Integer value, BroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
            for (String k : this.keysToRegister) {
                ctx.getBroadcastState(STATE_DESCRIPTOR).put((Object)k, (Object)value);
            }
            out.collect((Object)(value + "WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp()));
        }

        public void processElement(String value, BroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
            HashSet<String> retrievedKeySet = new HashSet<String>();
            for (Map.Entry entry : ctx.getBroadcastState(STATE_DESCRIPTOR).immutableEntries()) {
                retrievedKeySet.add((String)entry.getKey());
            }
            Assertions.assertThat(retrievedKeySet).isEqualTo(this.keysToRegister);
            out.collect((Object)(value + "WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp()));
        }
    }

    private static class FunctionWithMultipleStates
    extends BroadcastProcessFunction<String, Integer, String> {
        private static final long serialVersionUID = 7496674620398203933L;

        private FunctionWithMultipleStates() {
        }

        public void processBroadcastElement(Integer value, BroadcastProcessFunction.Context ctx, Collector<String> out) throws Exception {
            ctx.getBroadcastState(STATE_DESCRIPTOR).put((Object)("key." + value), (Object)value);
            ctx.getBroadcastState(STATE_DESCRIPTOR_A).put((Object)value, (Object)("value." + value));
        }

        public void processElement(String value, BroadcastProcessFunction.ReadOnlyContext ctx, Collector<String> out) throws Exception {
            for (Map.Entry entry : ctx.getBroadcastState(STATE_DESCRIPTOR).immutableEntries()) {
                out.collect((Object)(value + ":" + (String)entry.getKey() + "->" + String.valueOf(entry.getValue())));
            }
            for (Map.Entry entry : ctx.getBroadcastState(STATE_DESCRIPTOR_A).immutableEntries()) {
                out.collect((Object)(value + ":" + String.valueOf(entry.getKey()) + "->" + (String)entry.getValue()));
            }
        }
    }
}

