/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.netty.CancelPartitionRequestTest;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.NettyProtocol;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.PartitionRequestListener;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.TestingResultPartition;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.testutils.TestingUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class PartitionRequestRegistrationTest {
    PartitionRequestRegistrationTest() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testRegisterResultPartitionBeforeRequest() throws Exception {
        TestPooledBufferProvider outboundBuffers = new TestPooledBufferProvider(16);
        CountDownLatch sync = new CountDownLatch(1);
        CancelPartitionRequestTest.InfiniteSubpartitionView view = new CancelPartitionRequestTest.InfiniteSubpartitionView(outboundBuffers, sync);
        ResultPartitionManager partitionManager = new ResultPartitionManager();
        TestingResultPartition resultPartition = TestingResultPartition.newBuilder().setResultPartitionManager(partitionManager).setCreateSubpartitionViewFunction((index, listener) -> view).build();
        partitionManager.registerResultPartition((ResultPartition)resultPartition);
        NettyTestUtil.NettyServerAndClient serverAndClient = null;
        try {
            NettyProtocol protocol = new NettyProtocol((ResultPartitionProvider)partitionManager, (TaskEventPublisher)new NoOpTaskEventPublisher());
            serverAndClient = NettyTestUtil.initServerAndClient(protocol);
            Channel ch = NettyTestUtil.connect(serverAndClient);
            ch.writeAndFlush((Object)new NettyMessage.PartitionRequest(resultPartition.getPartitionId(), new ResultSubpartitionIndexSet(0), new InputChannelID(), Integer.MAX_VALUE)).await();
            if (!sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) {
                Assertions.fail((String)("Timed out after waiting for " + TestingUtils.TESTING_DURATION.toMillis() + " ms to be notified about cancelled partition."));
            }
        }
        catch (Throwable throwable) {
            NettyTestUtil.shutdown(serverAndClient);
            throw throwable;
        }
        NettyTestUtil.shutdown(serverAndClient);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testRegisterResultPartitionAfterRequest() throws Exception {
        TestPooledBufferProvider outboundBuffers = new TestPooledBufferProvider(16);
        CountDownLatch sync = new CountDownLatch(1);
        CancelPartitionRequestTest.InfiniteSubpartitionView view = new CancelPartitionRequestTest.InfiniteSubpartitionView(outboundBuffers, sync);
        ResultPartitionManager partitionManager = new ResultPartitionManager();
        TestingResultPartition resultPartition = TestingResultPartition.newBuilder().setResultPartitionManager(partitionManager).setCreateSubpartitionViewFunction((index, listener) -> view).build();
        NettyTestUtil.NettyServerAndClient serverAndClient = null;
        try {
            NettyProtocol protocol = new NettyProtocol((ResultPartitionProvider)partitionManager, (TaskEventPublisher)new NoOpTaskEventPublisher());
            serverAndClient = NettyTestUtil.initServerAndClient(protocol);
            Channel ch = NettyTestUtil.connect(serverAndClient);
            ch.writeAndFlush((Object)new NettyMessage.PartitionRequest(resultPartition.getPartitionId(), new ResultSubpartitionIndexSet(0), new InputChannelID(), Integer.MAX_VALUE)).await();
            partitionManager.registerResultPartition((ResultPartition)resultPartition);
            if (!sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) {
                Assertions.fail((String)("Timed out after waiting for " + TestingUtils.TESTING_DURATION.toMillis() + " ms to be notified about cancelled partition."));
            }
        }
        catch (Throwable throwable) {
            NettyTestUtil.shutdown(serverAndClient);
            throw throwable;
        }
        NettyTestUtil.shutdown(serverAndClient);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testPartitionRequestNotifierTimeout() throws Exception {
        ResultPartitionID pid = new ResultPartitionID();
        CountDownLatch sync = new CountDownLatch(1);
        NettyTestUtil.NettyServerAndClient serverAndClient = null;
        try {
            ResultPartitionProvider partitions = new ResultPartitionProvider(){

                public ResultSubpartitionView createSubpartitionView(ResultPartitionID partitionId, ResultSubpartitionIndexSet indexSet, BufferAvailabilityListener availabilityListener) {
                    return null;
                }

                public Optional<ResultSubpartitionView> createSubpartitionViewOrRegisterListener(ResultPartitionID partitionId, ResultSubpartitionIndexSet indexSet, BufferAvailabilityListener availabilityListener, PartitionRequestListener partitionRequestListener) {
                    partitionRequestListener.notifyPartitionCreatedTimeout();
                    return Optional.empty();
                }

                public void releasePartitionRequestListener(PartitionRequestListener listener) {
                }
            };
            NettyProtocol protocol = new NettyProtocol(partitions, (TaskEventPublisher)new NoOpTaskEventPublisher());
            serverAndClient = NettyTestUtil.initServerAndClient(protocol);
            Channel ch = NettyTestUtil.connect(serverAndClient);
            NetworkClientHandler clientHandler = (NetworkClientHandler)ch.pipeline().get(NetworkClientHandler.class);
            TestRemoteInputChannelForPartitionNotFound remoteInputChannel = new TestRemoteInputChannelForPartitionNotFound(sync);
            clientHandler.addInputChannel((RemoteInputChannel)remoteInputChannel);
            ch.writeAndFlush((Object)new NettyMessage.PartitionRequest(pid, new ResultSubpartitionIndexSet(0), remoteInputChannel.getInputChannelId(), Integer.MAX_VALUE)).await();
            if (!sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) {
                Assertions.fail((String)("Timed out after waiting for " + TestingUtils.TESTING_DURATION.toMillis() + " ms to be notified about cancelled partition."));
            }
        }
        catch (Throwable throwable) {
            NettyTestUtil.shutdown(serverAndClient);
            throw throwable;
        }
        NettyTestUtil.shutdown(serverAndClient);
    }

    private static class NoOpTaskEventPublisher
    implements TaskEventPublisher {
        private NoOpTaskEventPublisher() {
        }

        public boolean publish(ResultPartitionID partitionId, TaskEvent event) {
            return true;
        }
    }

    private static class TestRemoteInputChannelForPartitionNotFound
    extends RemoteInputChannel {
        private final CountDownLatch latch;

        TestRemoteInputChannelForPartitionNotFound(CountDownLatch latch) {
            super(new SingleInputGateBuilder().setNumberOfChannels(1).build(), 0, new ResultPartitionID(), new ResultSubpartitionIndexSet(0), InputChannelBuilder.STUB_CONNECTION_ID, (ConnectionManager)new TestingConnectionManager(), 0, 100, 10000, 2, (Counter)new SimpleCounter(), (Counter)new SimpleCounter(), ChannelStateWriter.NO_OP);
            this.latch = latch;
        }

        public void onFailedPartitionRequest() {
            this.latch.countDown();
        }
    }
}

