/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import org.apache.flink.api.common.io.DelimitedInputFormat;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.DataSourceTask;
import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
import org.apache.flink.runtime.operators.testutils.TaskTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.Value;
import org.apache.flink.util.MutableObjectIterator;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.AbstractFileAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.MapAssert;
import org.junit.jupiter.api.Test;

public class DataSourceTaskTest
extends TaskTestBase {
    private static final int MEMORY_MANAGER_SIZE = 0x100000;
    private static final int NETWORK_BUFFER_SIZE = 1024;
    private List<Record> outList;

    @Test
    void testDataSourceTask() throws IOException {
        int keyCnt = 100;
        int valCnt = 20;
        this.outList = new ArrayList<Record>();
        File tempTestFile = new File(this.tempFolder.toFile(), UUID.randomUUID().toString());
        InputFilePreparator.prepareInputFile(new UniformRecordGenerator(keyCnt, valCnt, false), tempTestFile, true);
        super.initEnvironment(0x100000L, 1024);
        super.addOutput(this.outList);
        DataSourceTask testTask = new DataSourceTask((Environment)this.mockEnv);
        super.registerFileInputTask((AbstractInvokable)testTask, MockInputFormat.class, tempTestFile.toURI().toString(), "\n");
        try {
            testTask.invoke();
        }
        catch (Exception e) {
            System.err.println(e);
            Assertions.fail((String)"Invoke method caused exception.");
        }
        try {
            Field formatField = DataSourceTask.class.getDeclaredField("format");
            formatField.setAccessible(true);
            MockInputFormat inputFormat = (MockInputFormat)((Object)formatField.get(testTask));
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)inputFormat.opened).withFailMessage("Invalid status of the input format. Expected for opened: true, Actual: %b", new Object[]{inputFormat.opened})).isTrue();
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)inputFormat.closed).withFailMessage("Invalid status of the input format. Expected for closed: true, Actual: %b", new Object[]{inputFormat.closed})).isTrue();
        }
        catch (Exception e) {
            System.err.println(e);
            Assertions.fail((String)"Reflection error while trying to validate inputFormat status.");
        }
        ((ListAssert)Assertions.assertThat(this.outList).withFailMessage("Invalid output size. Expected: %d, Actual: %d", new Object[]{keyCnt * valCnt, this.outList.size()})).hasSize(keyCnt * valCnt);
        HashMap keyValueCountMap = new HashMap(keyCnt);
        for (Record kvp : this.outList) {
            int key = ((IntValue)kvp.getField(0, IntValue.class)).getValue();
            int val = ((IntValue)kvp.getField(1, IntValue.class)).getValue();
            if (!keyValueCountMap.containsKey(key)) {
                keyValueCountMap.put(key, new HashSet());
            }
            ((HashSet)keyValueCountMap.get(key)).add(val);
        }
        ((MapAssert)Assertions.assertThat(keyValueCountMap).withFailMessage("Invalid key count in out file. Expected: %d, Actual: %d", new Object[]{keyCnt, keyValueCountMap.size()})).hasSize(keyCnt);
        for (Integer mapKey : keyValueCountMap.keySet()) {
            ((AbstractCollectionAssert)Assertions.assertThat((Collection)((Collection)keyValueCountMap.get(mapKey))).withFailMessage("Invalid value count for key: %d. Expected: %d, Actual: %d", new Object[]{mapKey, valCnt, ((HashSet)keyValueCountMap.get(mapKey)).size()})).hasSize(valCnt);
        }
    }

    @Test
    void testFailingDataSourceTask() throws IOException {
        int keyCnt = 20;
        int valCnt = 10;
        this.outList = new NirvanaOutputList();
        File tempTestFile = new File(this.tempFolder.toFile(), UUID.randomUUID().toString());
        InputFilePreparator.prepareInputFile(new UniformRecordGenerator(keyCnt, valCnt, false), tempTestFile, false);
        super.initEnvironment(0x100000L, 1024);
        super.addOutput(this.outList);
        DataSourceTask testTask = new DataSourceTask((Environment)this.mockEnv);
        super.registerFileInputTask((AbstractInvokable)testTask, MockFailingInputFormat.class, tempTestFile.toURI().toString(), "\n");
        boolean stubFailed = false;
        try {
            testTask.invoke();
        }
        catch (Exception e) {
            stubFailed = true;
        }
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)stubFailed).withFailMessage("Function exception was not forwarded.", new Object[0])).isTrue();
        ((AbstractFileAssert)Assertions.assertThat((File)tempTestFile).withFailMessage("Temp output file does not exist", new Object[0])).exists();
    }

    @Test
    void testCancelDataSourceTask() throws IOException {
        int keyCnt = 20;
        int valCnt = 4;
        super.initEnvironment(0x100000L, 1024);
        super.addOutput(new NirvanaOutputList());
        File tempTestFile = new File(this.tempFolder.toFile(), UUID.randomUUID().toString());
        InputFilePreparator.prepareInputFile(new UniformRecordGenerator(keyCnt, valCnt, false), tempTestFile, false);
        final DataSourceTask testTask = new DataSourceTask((Environment)this.mockEnv);
        super.registerFileInputTask((AbstractInvokable)testTask, MockDelayingInputFormat.class, tempTestFile.toURI().toString(), "\n");
        Thread taskRunner = new Thread(){

            @Override
            public void run() {
                try {
                    testTask.invoke();
                }
                catch (Exception ie) {
                    ie.printStackTrace();
                    Assertions.fail((String)"Task threw exception although it was properly canceled");
                }
            }
        };
        taskRunner.start();
        TaskCancelThread tct = new TaskCancelThread(1, taskRunner, (AbstractInvokable)testTask);
        tct.start();
        try {
            tct.join();
            taskRunner.join();
        }
        catch (InterruptedException ie) {
            Assertions.fail((String)"Joining threads failed");
        }
        ((AbstractFileAssert)Assertions.assertThat((File)tempTestFile).withFailMessage("Temp output file does not exist", new Object[0])).exists();
    }

    public static class MockFailingInputFormat
    extends DelimitedInputFormat<Record> {
        private static final long serialVersionUID = 1L;
        private final IntValue key = new IntValue();
        private final IntValue value = new IntValue();
        private int cnt = 0;

        public Record readRecord(Record target, byte[] record, int offset, int numBytes) {
            if (this.cnt == 10) {
                throw new RuntimeException("Excpected Test Exception.");
            }
            ++this.cnt;
            String line = new String(record, offset, numBytes, ConfigConstants.DEFAULT_CHARSET);
            try {
                this.key.setValue(Integer.parseInt(line.substring(0, line.indexOf("_"))));
                this.value.setValue(Integer.parseInt(line.substring(line.indexOf("_") + 1, line.length())));
            }
            catch (RuntimeException re) {
                return null;
            }
            target.setField(0, (Value)this.key);
            target.setField(1, (Value)this.value);
            return target;
        }
    }

    public static class MockDelayingInputFormat
    extends DelimitedInputFormat<Record> {
        private static final long serialVersionUID = 1L;
        private final IntValue key = new IntValue();
        private final IntValue value = new IntValue();

        public Record readRecord(Record target, byte[] record, int offset, int numBytes) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e) {
                return null;
            }
            String line = new String(record, offset, numBytes, ConfigConstants.DEFAULT_CHARSET);
            try {
                this.key.setValue(Integer.parseInt(line.substring(0, line.indexOf("_"))));
                this.value.setValue(Integer.parseInt(line.substring(line.indexOf("_") + 1, line.length())));
            }
            catch (RuntimeException re) {
                return null;
            }
            target.setField(0, (Value)this.key);
            target.setField(1, (Value)this.value);
            return target;
        }
    }

    public static class MockInputFormat
    extends DelimitedInputFormat<Record> {
        private static final long serialVersionUID = 1L;
        private final IntValue key = new IntValue();
        private final IntValue value = new IntValue();
        private boolean opened = false;
        private boolean closed = false;

        public Record readRecord(Record target, byte[] record, int offset, int numBytes) {
            String line = new String(record, offset, numBytes, ConfigConstants.DEFAULT_CHARSET);
            try {
                this.key.setValue(Integer.parseInt(line.substring(0, line.indexOf("_"))));
                this.value.setValue(Integer.parseInt(line.substring(line.indexOf("_") + 1, line.length())));
            }
            catch (RuntimeException re) {
                return null;
            }
            target.setField(0, (Value)this.key);
            target.setField(1, (Value)this.value);
            return target;
        }

        public void openInputFormat() {
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.opened).withFailMessage("Invalid status of the input format. Expected for opened: false, Actual: %b", new Object[]{this.opened})).isFalse();
            this.opened = true;
        }

        public void closeInputFormat() {
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.closed).withFailMessage("Invalid status of the input format. Expected for closed: false, Actual: %b", new Object[]{this.closed})).isFalse();
            this.closed = true;
        }
    }

    public static class InputFilePreparator {
        public static void prepareInputFile(MutableObjectIterator<Record> inIt, File inputFile, boolean insertInvalidData) throws IOException {
            try (BufferedWriter bw = new BufferedWriter(new FileWriter(inputFile));){
                if (insertInvalidData) {
                    bw.write("####_I_AM_INVALID_########\n");
                }
                Record rec = new Record();
                while ((rec = (Record)inIt.next((Object)rec)) != null) {
                    IntValue key = (IntValue)rec.getField(0, IntValue.class);
                    IntValue value = (IntValue)rec.getField(1, IntValue.class);
                    bw.write(key.getValue() + "_" + value.getValue() + "\n");
                }
                if (insertInvalidData) {
                    bw.write("####_I_AM_INVALID_########\n");
                }
                bw.flush();
            }
        }
    }
}

