/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.metadata;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointTestUtils;
import org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase;
import org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer;
import org.apache.flink.runtime.state.ChangelogTestUtils;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class MetadataV3SerializerTest {
    @TempDir
    private java.nio.file.Path temporaryFolder;

    MetadataV3SerializerTest() {
    }

    @Test
    void testCheckpointWithNoState() throws Exception {
        Random rnd = new Random();
        for (int i = 0; i < 100; ++i) {
            long checkpointId = rnd.nextLong() & Long.MAX_VALUE;
            List<OperatorState> taskStates = Collections.emptyList();
            List<MasterState> masterStates = Collections.emptyList();
            this.testCheckpointSerialization(checkpointId, taskStates, masterStates, null);
        }
    }

    @Test
    void testCheckpointWithOnlyMasterState() throws Exception {
        Random rnd = new Random();
        int maxNumMasterStates = 5;
        for (int i = 0; i < 100; ++i) {
            long checkpointId = rnd.nextLong() & Long.MAX_VALUE;
            List<OperatorState> operatorStates = Collections.emptyList();
            int numMasterStates = rnd.nextInt(5) + 1;
            Collection<MasterState> masterStates = CheckpointTestUtils.createRandomMasterStates(rnd, numMasterStates);
            this.testCheckpointSerialization(checkpointId, operatorStates, masterStates, null);
        }
    }

    @Test
    void testCheckpointWithOnlyTaskStateForCheckpoint() throws Exception {
        this.testCheckpointWithOnlyTaskState(null);
    }

    @Test
    void testCheckpointWithOnlyTaskStateForSavepoint() throws Exception {
        this.testCheckpointWithOnlyTaskState(TempDirUtils.newFolder((java.nio.file.Path)this.temporaryFolder).toURI().toString());
    }

    private void testCheckpointWithOnlyTaskState(String basePath) throws Exception {
        Random rnd = new Random();
        int maxTaskStates = 20;
        int maxNumSubtasks = 20;
        for (int i = 0; i < 100; ++i) {
            long checkpointId = rnd.nextLong() & Long.MAX_VALUE;
            int numTasks = rnd.nextInt(20) + 1;
            int numSubtasks = rnd.nextInt(20) + 1;
            Collection<OperatorState> taskStates = CheckpointTestUtils.createOperatorStates(rnd, basePath, numTasks, 0, 0, numSubtasks);
            List<MasterState> masterStates = Collections.emptyList();
            this.testCheckpointSerialization(checkpointId, taskStates, masterStates, basePath);
        }
    }

    @Test
    void testCheckpointWithMasterAndTaskStateForCheckpoint() throws Exception {
        this.testCheckpointWithMasterAndTaskState(null);
    }

    @Test
    void testCheckpointWithMasterAndTaskStateForSavepoint() throws Exception {
        this.testCheckpointWithMasterAndTaskState(TempDirUtils.newFolder((java.nio.file.Path)this.temporaryFolder).toURI().toString());
    }

    private void testCheckpointWithMasterAndTaskState(String basePath) throws Exception {
        Random rnd = new Random();
        int maxNumMasterStates = 5;
        int maxTaskStates = 20;
        int maxNumSubtasks = 20;
        for (int i = 0; i < 100; ++i) {
            long checkpointId = rnd.nextLong() & Long.MAX_VALUE;
            int numTasks = rnd.nextInt(20) + 1;
            int numSubtasks = rnd.nextInt(20) + 1;
            Collection<OperatorState> taskStates = CheckpointTestUtils.createOperatorStates(rnd, basePath, numTasks, 0, 0, numSubtasks);
            int numMasterStates = rnd.nextInt(5) + 1;
            Collection<MasterState> masterStates = CheckpointTestUtils.createRandomMasterStates(rnd, numMasterStates);
            this.testCheckpointSerialization(checkpointId, taskStates, masterStates, basePath);
        }
    }

    @Test
    void testCheckpointWithFinishedTasksForCheckpoint() throws Exception {
        this.testCheckpointWithFinishedTasks(null);
    }

    @Test
    void testCheckpointWithFinishedTasksForSavepoint() throws Exception {
        this.testCheckpointWithFinishedTasks(TempDirUtils.newFolder((java.nio.file.Path)this.temporaryFolder).toURI().toString());
    }

    private void testCheckpointWithFinishedTasks(String basePath) throws Exception {
        Random rnd = new Random();
        int maxNumMasterStates = 5;
        int maxNumSubtasks = 20;
        int maxAllRunningTaskStates = 20;
        int maxPartlyFinishedStates = 10;
        int maxFullyFinishedSubtasks = 10;
        long checkpointId = rnd.nextLong() & Long.MAX_VALUE;
        int numSubtasks = rnd.nextInt(20) + 1;
        int numAllRunningTasks = rnd.nextInt(20) + 1;
        int numPartlyFinishedTasks = rnd.nextInt(10) + 1;
        int numFullyFinishedTasks = rnd.nextInt(10) + 1;
        Collection<OperatorState> taskStates = CheckpointTestUtils.createOperatorStates(rnd, basePath, numAllRunningTasks, numPartlyFinishedTasks, numFullyFinishedTasks, numSubtasks);
        int numMasterStates = rnd.nextInt(5) + 1;
        Collection<MasterState> masterStates = CheckpointTestUtils.createRandomMasterStates(rnd, numMasterStates);
        this.testCheckpointSerialization(checkpointId, taskStates, masterStates, basePath);
    }

    private void testCheckpointSerialization(long checkpointId, Collection<OperatorState> operatorStates, Collection<MasterState> masterStates, @Nullable String basePath) throws IOException {
        MetadataV3Serializer serializer = MetadataV3Serializer.INSTANCE;
        ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos();
        DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper((OutputStream)baos);
        CheckpointMetadata metadata = new CheckpointMetadata(checkpointId, operatorStates, masterStates);
        MetadataV3Serializer.INSTANCE.serialize(metadata, (DataOutputStream)out);
        out.close();
        if (basePath != null) {
            Path metaPath = new Path(basePath, "_metadata");
            FileSystem.getLocalFileSystem().create(metaPath, FileSystem.WriteMode.OVERWRITE).close();
        }
        byte[] bytes = baos.toByteArray();
        DataInputViewStreamWrapper in = new DataInputViewStreamWrapper((InputStream)new ByteArrayInputStreamWithPos(bytes));
        CheckpointMetadata deserialized = serializer.deserialize((DataInputStream)in, this.getClass().getClassLoader(), basePath);
        Assertions.assertThat((long)deserialized.getCheckpointId()).isEqualTo(checkpointId);
        Assertions.assertThat((Collection)deserialized.getOperatorStates()).isEqualTo(operatorStates);
        Assertions.assertThat(deserialized.getOperatorStates().stream().map(OperatorState::isFullyFinished).collect(Collectors.toList())).isEqualTo(operatorStates.stream().map(OperatorState::isFullyFinished).collect(Collectors.toList()));
        Assertions.assertThat((Collection)deserialized.getMasterStates()).hasSameSizeAs(masterStates);
        Iterator<MasterState> a = masterStates.iterator();
        Iterator b = deserialized.getMasterStates().iterator();
        while (a.hasNext()) {
            CheckpointTestUtils.assertMasterStateEquality(a.next(), (MasterState)b.next());
        }
    }

    @Test
    void testSerializeKeyGroupsStateHandle() throws IOException {
        KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(0, 123);
        byte[] data = new byte[]{1, 2, 3, 4};
        try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos();){
            MetadataV2V3SerializerBase.serializeStreamStateHandle((StreamStateHandle)new KeyGroupsStateHandle(offsets, (StreamStateHandle)new ByteStreamStateHandle("test", data)), (DataOutputStream)new DataOutputStream((OutputStream)out));
            try (ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());){
                StreamStateHandle handle = MetadataV2V3SerializerBase.deserializeStreamStateHandle((DataInputStream)new DataInputStream(in), null);
                ((ObjectAssert)Assertions.assertThat((Object)handle).isNotNull()).isInstanceOf(KeyGroupsStateHandle.class);
                Assertions.assertThat((Iterable)((KeyGroupsStateHandle)handle).getGroupRangeOffsets()).isEqualTo((Object)offsets);
                byte[] deserialized = new byte[data.length];
                try (FSDataInputStream dataStream = handle.openInputStream();){
                    dataStream.read(deserialized);
                    Assertions.assertThat((byte[])deserialized).isEqualTo((Object)data);
                }
            }
        }
    }

    @Test
    void testSerializeIncrementalChangelogStateBackendHandle() throws IOException {
        this.testSerializeChangelogStateBackendHandle(false);
    }

    @Test
    void testSerializeFullChangelogStateBackendHandle() throws IOException {
        this.testSerializeChangelogStateBackendHandle(true);
    }

    private void testSerializeChangelogStateBackendHandle(boolean fullSnapshot) throws IOException {
        ChangelogStateBackendHandle handle = this.createChangelogStateBackendHandle(fullSnapshot);
        try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos();){
            MetadataV2V3SerializerBase.serializeKeyedStateHandle((KeyedStateHandle)handle, (DataOutputStream)new DataOutputStream((OutputStream)out));
            try (ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());){
                KeyedStateHandle deserialized = MetadataV2V3SerializerBase.deserializeKeyedStateHandle((DataInputStream)new DataInputStream(in), null);
                Assertions.assertThat((Object)deserialized).isInstanceOfSatisfying(ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl.class, o -> Assertions.assertThat((List)o.getMaterializedStateHandles()).isEqualTo((Object)handle.getMaterializedStateHandles()));
            }
        }
    }

    private ChangelogStateBackendHandle createChangelogStateBackendHandle(boolean fullSnapshot) {
        IncrementalRemoteKeyedStateHandle keyedStateHandle = fullSnapshot ? CheckpointTestUtils.createDummyKeyGroupStateHandle(ThreadLocalRandom.current(), null) : CheckpointTestUtils.createDummyIncrementalKeyedStateHandle(ThreadLocalRandom.current());
        return ChangelogTestUtils.createChangelogStateBackendHandle((KeyedStateHandle)keyedStateHandle);
    }
}

