/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.util;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.checkpointing.TestBarrierHandlerFactory;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
import org.apache.flink.util.function.SupplierWithException;

public class TestCheckpointedInputGateBuilder {
    private final int numChannels;
    private final TestBarrierHandlerFactory barrierHandlerFactory;
    private ChannelStateWriter channelStateWriter = new RecordingChannelStateWriter();
    private SupplierWithException<SingleInputGate, IOException> gateBuilder = this::buildTestGate;
    private MailboxExecutor mailboxExecutor;

    public TestCheckpointedInputGateBuilder(int numChannels, TestBarrierHandlerFactory barrierHandler) {
        this.numChannels = numChannels;
        this.barrierHandlerFactory = barrierHandler;
        this.mailboxExecutor = new SyncMailboxExecutor();
    }

    public TestCheckpointedInputGateBuilder withRemoteChannels() {
        this.gateBuilder = this::buildRemoteGate;
        return this;
    }

    public TestCheckpointedInputGateBuilder withMixedChannels(Integer ... testChannelIds) {
        this.gateBuilder = () -> this.buildMixedGate(testChannelIds);
        return this;
    }

    public TestCheckpointedInputGateBuilder withTestChannels() {
        this.gateBuilder = this::buildTestGate;
        return this;
    }

    public TestCheckpointedInputGateBuilder withSyncExecutor() {
        this.mailboxExecutor = new SyncMailboxExecutor();
        return this;
    }

    public TestCheckpointedInputGateBuilder withMailboxExecutor() {
        this.mailboxExecutor = new MailboxProcessor().getMainMailboxExecutor();
        return this;
    }

    public TestCheckpointedInputGateBuilder withChannelStateWriter(ChannelStateWriter channelStateWriter) {
        this.channelStateWriter = channelStateWriter;
        return this;
    }

    public CheckpointedInputGate build() throws IOException {
        SingleInputGate gate = (SingleInputGate)this.gateBuilder.get();
        return new CheckpointedInputGate((InputGate)gate, (CheckpointBarrierHandler)this.barrierHandlerFactory.create(gate, this.channelStateWriter), this.mailboxExecutor);
    }

    private SingleInputGate buildTestGate() {
        SingleInputGate gate = new SingleInputGateBuilder().setNumberOfChannels(this.numChannels).build();
        InputChannel[] channels = new TestInputChannel[this.numChannels];
        for (int i = 0; i < this.numChannels; ++i) {
            channels[i] = new TestInputChannel(gate, i, false, true);
        }
        gate.setInputChannels(channels);
        return gate;
    }

    private SingleInputGate buildMixedGate(Integer ... testChannelIds) throws IOException {
        HashSet<Integer> testChannelIdSet = new HashSet<Integer>(Arrays.asList(testChannelIds));
        SingleInputGate gate = this.buildRemoteGate();
        InputChannel[] channels = new InputChannel[this.numChannels];
        for (int i = 0; i < this.numChannels; ++i) {
            channels[i] = testChannelIdSet.contains(i) ? new TestInputChannel(gate, i, false, true) : gate.getChannel(i);
        }
        gate.setInputChannels(channels);
        return gate;
    }

    private SingleInputGate buildRemoteGate() throws IOException {
        int maxUsedBuffers = 10;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(this.numChannels * maxUsedBuffers, 4096);
        SingleInputGate gate = new SingleInputGateBuilder().setChannelFactory(InputChannelBuilder::buildRemoteChannel).setNumberOfChannels(this.numChannels).setSegmentProvider((MemorySegmentProvider)networkBufferPool).setBufferPoolFactory(networkBufferPool.createBufferPool(this.numChannels, maxUsedBuffers)).setChannelStateWriter(this.channelStateWriter).build();
        gate.setup();
        gate.requestPartitions();
        return gate;
    }
}

