/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import java.util.ArrayList;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.GroupReduceCombineDriver;
import org.apache.flink.runtime.operators.testutils.DelayingIterator;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.operators.testutils.InfiniteIntTupleIterator;
import org.apache.flink.runtime.operators.testutils.UnaryOperatorTestBase;
import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
import org.apache.flink.util.Collector;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;

class CombineTaskTest
extends UnaryOperatorTestBase<RichGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
    private static final long COMBINE_MEM = 0x300000L;
    private final double combine_frac;
    private final ArrayList<Tuple2<Integer, Integer>> outList = new ArrayList();
    private final TypeSerializer<Tuple2<Integer, Integer>> serializer = new TupleSerializer(Tuple2.class, new TypeSerializer[]{IntSerializer.INSTANCE, IntSerializer.INSTANCE});
    private final TypeComparator<Tuple2<Integer, Integer>> comparator = new TupleComparator(new int[]{0}, new TypeComparator[]{new IntComparator(true)}, new TypeSerializer[]{IntSerializer.INSTANCE});

    CombineTaskTest(ExecutionConfig config) {
        super(config, 0x300000L, 0);
        this.combine_frac = 3145728.0 / (double)this.getMemoryManager().getMemorySize();
    }

    @TestTemplate
    void testCombineTask() {
        try {
            int keyCnt = 100;
            int valCnt = 20;
            this.setInput(new UniformIntTupleGenerator(keyCnt, valCnt, false), this.serializer);
            this.addDriverComparator(this.comparator);
            this.addDriverComparator(this.comparator);
            this.setOutput(this.outList, this.serializer);
            this.getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
            this.getTaskConfig().setRelativeMemoryDriver(this.combine_frac);
            this.getTaskConfig().setFilehandlesDriver(2);
            GroupReduceCombineDriver testTask = new GroupReduceCombineDriver();
            this.testDriver((Driver)testTask, MockCombiningReduceStub.class);
            int expSum = 0;
            for (int i = 1; i < valCnt; ++i) {
                expSum += i;
            }
            Assertions.assertThat(this.outList).hasSize(keyCnt);
            for (Tuple2<Integer, Integer> record : this.outList) {
                Assertions.assertThat((Integer)((Integer)record.f1)).isEqualTo(expSum);
            }
            this.outList.clear();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @TestTemplate
    void testFailingCombineTask() {
        try {
            int keyCnt = 100;
            int valCnt = 20;
            this.setInput(new UniformIntTupleGenerator(keyCnt, valCnt, false), this.serializer);
            this.addDriverComparator(this.comparator);
            this.addDriverComparator(this.comparator);
            this.setOutput(new DiscardingOutputCollector());
            this.getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
            this.getTaskConfig().setRelativeMemoryDriver(this.combine_frac);
            this.getTaskConfig().setFilehandlesDriver(2);
            GroupReduceCombineDriver testTask = new GroupReduceCombineDriver();
            Assertions.assertThatThrownBy(() -> this.testDriver((Driver)testTask, MockFailingCombiningReduceStub.class)).isInstanceOf(ExpectedTestException.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @TestTemplate
    void testCancelCombineTaskSorting() {
        try {
            DelayingIterator<Tuple2<Integer, Integer>> slowInfiniteInput = new DelayingIterator<Tuple2<Integer, Integer>>(new InfiniteIntTupleIterator(), 1);
            this.setInput(slowInfiniteInput, this.serializer);
            this.addDriverComparator(this.comparator);
            this.addDriverComparator(this.comparator);
            this.setOutput(new DiscardingOutputCollector());
            this.getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
            this.getTaskConfig().setRelativeMemoryDriver(this.combine_frac);
            this.getTaskConfig().setFilehandlesDriver(2);
            final GroupReduceCombineDriver testTask = new GroupReduceCombineDriver();
            Thread taskRunner = new Thread(){

                @Override
                public void run() {
                    try {
                        CombineTaskTest.this.testDriver((Driver)testTask, MockFailingCombiningReduceStub.class);
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
            };
            taskRunner.start();
            Thread.sleep(500L);
            testTask.cancel();
            long deadline = System.currentTimeMillis() + 10000L;
            do {
                taskRunner.interrupt();
                taskRunner.join(5000L);
            } while (taskRunner.isAlive() && System.currentTimeMillis() < deadline);
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)taskRunner.isAlive()).withFailMessage("Task did not cancel properly within in 10 seconds.", new Object[0])).isFalse();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    public static final class MockFailingCombiningReduceStub
    implements GroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>,
    GroupCombineFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1L;
        private int cnt;

        public void reduce(Iterable<Tuple2<Integer, Integer>> records, Collector<Tuple2<Integer, Integer>> out) {
            int key = 0;
            int sum = 0;
            for (Tuple2<Integer, Integer> next : records) {
                key = (Integer)next.f0;
                sum += ((Integer)next.f1).intValue();
            }
            int resultValue = sum - key;
            out.collect((Object)new Tuple2((Object)key, (Object)resultValue));
        }

        public void combine(Iterable<Tuple2<Integer, Integer>> records, Collector<Tuple2<Integer, Integer>> out) {
            int key = 0;
            int sum = 0;
            for (Tuple2<Integer, Integer> next : records) {
                key = (Integer)next.f0;
                sum += ((Integer)next.f1).intValue();
            }
            if (++this.cnt >= 10) {
                throw new ExpectedTestException();
            }
            int resultValue = sum - key;
            out.collect((Object)new Tuple2((Object)key, (Object)resultValue));
        }
    }

    public static class MockCombiningReduceStub
    implements GroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>,
    GroupCombineFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterable<Tuple2<Integer, Integer>> records, Collector<Tuple2<Integer, Integer>> out) {
            int key = 0;
            int sum = 0;
            for (Tuple2<Integer, Integer> next : records) {
                key = (Integer)next.f0;
                sum += ((Integer)next.f1).intValue();
            }
            out.collect((Object)new Tuple2((Object)key, (Object)sum));
        }

        public void combine(Iterable<Tuple2<Integer, Integer>> records, Collector<Tuple2<Integer, Integer>> out) {
            this.reduce(records, out);
        }
    }
}

