/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.sortpartition;

import java.io.Serializable;
import java.util.LinkedList;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.sortpartition.KeyedSortPartitionOperator;
import org.apache.flink.streaming.api.operators.sortpartition.SortPartitionOperatorTest;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class KeyedSortPartitionOperatorTest {
    KeyedSortPartitionOperatorTest() {
    }

    @Test
    void testSortPartition() throws Exception {
        KeyedSortPartitionOperator<Tuple2<Integer, String>, String> operator1 = this.createSortPartitionOperatorWithPositionField();
        OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness1 = new OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String>((OneInputStreamOperator<Tuple2<Integer, String>, String>)operator1);
        StreamConfig streamConfig1 = testHarness1.getStreamConfig();
        streamConfig1.setStateKeySerializer(Types.STRING.createSerializer((SerializerConfig)null));
        streamConfig1.serializeAllConfigs();
        LinkedList<StreamRecord> expectedOutput1 = new LinkedList<StreamRecord>();
        testHarness1.open();
        testHarness1.processElement((StreamRecord<Tuple2<Integer, String>>)new StreamRecord((Object)Tuple2.of((Object)3, (Object)"3")));
        testHarness1.processElement((StreamRecord<Tuple2<Integer, String>>)new StreamRecord((Object)Tuple2.of((Object)1, (Object)"1")));
        testHarness1.endInput();
        testHarness1.close();
        expectedOutput1.add(new StreamRecord((Object)Tuple2.of((Object)1, (Object)"1")));
        expectedOutput1.add(new StreamRecord((Object)Tuple2.of((Object)3, (Object)"3")));
        TestHarnessUtil.assertOutputEquals("The sort partition result is not correct.", expectedOutput1, testHarness1.getOutput());
        KeyedSortPartitionOperator<TestPojo, String> operator2 = this.createSortPartitionOperatorWithStringField();
        OneInputStreamOperatorTestHarness<TestPojo, String> testHarness2 = new OneInputStreamOperatorTestHarness<TestPojo, String>((OneInputStreamOperator<TestPojo, String>)operator2);
        StreamConfig streamConfig2 = testHarness2.getStreamConfig();
        streamConfig2.setStateKeySerializer(Types.STRING.createSerializer((SerializerConfig)null));
        streamConfig2.serializeAllConfigs();
        LinkedList<StreamRecord> expectedOutput2 = new LinkedList<StreamRecord>();
        testHarness2.open();
        testHarness2.processElement((StreamRecord<TestPojo>)new StreamRecord((Object)new TestPojo("3", 3)));
        testHarness2.processElement((StreamRecord<TestPojo>)new StreamRecord((Object)new TestPojo("1", 1)));
        testHarness2.endInput();
        testHarness2.close();
        expectedOutput2.add(new StreamRecord((Object)new SortPartitionOperatorTest.TestPojo("1", 1)));
        expectedOutput2.add(new StreamRecord((Object)new SortPartitionOperatorTest.TestPojo("3", 3)));
        TestHarnessUtil.assertOutputEquals("The sort partition result is not correct.", expectedOutput2, testHarness2.getOutput());
        KeyedSortPartitionOperator<TestPojo, String> operator3 = this.createSortPartitionOperatorWithKeySelector();
        OneInputStreamOperatorTestHarness<TestPojo, String> testHarness3 = new OneInputStreamOperatorTestHarness<TestPojo, String>((OneInputStreamOperator<TestPojo, String>)operator3);
        StreamConfig streamConfig3 = testHarness3.getStreamConfig();
        streamConfig3.setStateKeySerializer(Types.STRING.createSerializer((SerializerConfig)null));
        streamConfig3.serializeAllConfigs();
        LinkedList<StreamRecord> expectedOutput3 = new LinkedList<StreamRecord>();
        testHarness3.open();
        testHarness3.processElement((StreamRecord<TestPojo>)new StreamRecord((Object)new TestPojo("3", 3)));
        testHarness3.processElement((StreamRecord<TestPojo>)new StreamRecord((Object)new TestPojo("1", 1)));
        testHarness3.endInput();
        testHarness3.close();
        expectedOutput3.add(new StreamRecord((Object)new SortPartitionOperatorTest.TestPojo("1", 1)));
        expectedOutput3.add(new StreamRecord((Object)new SortPartitionOperatorTest.TestPojo("3", 3)));
        TestHarnessUtil.assertOutputEquals("The sort partition result is not correct.", expectedOutput3, testHarness3.getOutput());
    }

    @Test
    void testOpenClose() throws Exception {
        KeyedSortPartitionOperator<Tuple2<Integer, String>, String> sortPartitionOperator = this.createSortPartitionOperatorWithPositionField();
        OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness = new OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String>((OneInputStreamOperator<Tuple2<Integer, String>, String>)sortPartitionOperator);
        StreamConfig streamConfig = testHarness.getStreamConfig();
        streamConfig.setStateKeySerializer(Types.STRING.createSerializer((SerializerConfig)null));
        streamConfig.serializeAllConfigs();
        testHarness.open();
        testHarness.processElement((StreamRecord<Tuple2<Integer, String>>)new StreamRecord((Object)Tuple2.of((Object)1, (Object)"1")));
        testHarness.endInput();
        testHarness.close();
        Assertions.assertThat(testHarness.getOutput()).isNotEmpty();
    }

    private KeyedSortPartitionOperator<Tuple2<Integer, String>, String> createSortPartitionOperatorWithPositionField() {
        TypeInformation inputType = Types.TUPLE((TypeInformation[])new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO});
        int positionSortField = 0;
        Order sortOrder = Order.ASCENDING;
        return new KeyedSortPartitionOperator(inputType, positionSortField, sortOrder);
    }

    private KeyedSortPartitionOperator<TestPojo, String> createSortPartitionOperatorWithStringField() {
        TypeInformation inputType = Types.POJO(TestPojo.class);
        String positionSortField = "value";
        Order sortOrder = Order.ASCENDING;
        return new KeyedSortPartitionOperator(inputType, positionSortField, sortOrder);
    }

    private KeyedSortPartitionOperator<TestPojo, String> createSortPartitionOperatorWithKeySelector() {
        TypeInformation inputType = Types.POJO(TestPojo.class);
        Order sortOrder = Order.ASCENDING;
        return new KeyedSortPartitionOperator(inputType, TestPojo::getValue, sortOrder);
    }

    public static class TestPojo
    implements Serializable {
        public String key;
        public Integer value;

        public TestPojo() {
        }

        public TestPojo(String key, Integer value) {
            this.key = key;
            this.value = value;
        }

        public Integer getValue() {
            return this.value;
        }

        public void setValue(Integer value) {
            this.value = value;
        }

        public String getKey() {
            return this.key;
        }

        public void setKey(String key) {
            this.key = key;
        }

        public boolean equals(Object object) {
            if (object instanceof SortPartitionOperatorTest.TestPojo) {
                SortPartitionOperatorTest.TestPojo testPojo = (SortPartitionOperatorTest.TestPojo)object;
                return testPojo.getKey().equals(this.getKey()) && testPojo.getValue().equals(this.getValue());
            }
            return false;
        }
    }
}

