/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.core.failure.TestingFailureEnricher;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
import org.apache.flink.runtime.executiongraph.failover.FixedDelayRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.DefaultSchedulerTest;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo;
import org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDeciderTest;
import org.apache.flink.runtime.scheduler.adaptivebatch.PointwiseBlockingResultInfo;
import org.apache.flink.runtime.scheduler.adaptivebatch.VertexParallelismAndInputInfosDecider;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.JobVertexConnectionUtils;
import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.function.BiFunctionWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class AdaptiveBatchSchedulerTest {
    private static final int SOURCE_PARALLELISM_1 = 6;
    private static final int SOURCE_PARALLELISM_2 = 4;
    private static final long SUBPARTITION_BYTES = 100L;
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private ComponentMainThreadExecutor mainThreadExecutor;
    private ManuallyTriggeredScheduledExecutor taskRestartExecutor;

    AdaptiveBatchSchedulerTest() {
    }

    @BeforeEach
    void setUp() {
        this.mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
        this.taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    @Test
    void testVertexInitializationFailureIsLabeled() throws Exception {
        JobGraph jobGraph = this.createBrokenJobGraph();
        TestingFailureEnricher failureEnricher = new TestingFailureEnricher();
        RestartBackoffTimeStrategy restartStrategy = new FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory(Integer.MAX_VALUE, 0L).create();
        SchedulerBase scheduler = this.createScheduler(jobGraph, Collections.singleton(failureEnricher), restartStrategy);
        scheduler.startScheduling();
        Iterable exceptionHistory = scheduler.requestJob().getExceptionHistory();
        RootExceptionHistoryEntry failure = (RootExceptionHistoryEntry)exceptionHistory.iterator().next();
        Assertions.assertThat((Throwable)failure.getException()).hasMessageContaining("The failure is not recoverable");
        Assertions.assertThat((Map)failure.getFailureLabels()).isEqualTo((Object)failureEnricher.getFailureLabels());
    }

    @Test
    void testAdaptiveBatchScheduler() throws Exception {
        JobGraph jobGraph = this.createJobGraph();
        Iterator jobVertexIterator = jobGraph.getVertices().iterator();
        JobVertex source1 = (JobVertex)jobVertexIterator.next();
        JobVertex source2 = (JobVertex)jobVertexIterator.next();
        JobVertex sink = (JobVertex)jobVertexIterator.next();
        SchedulerBase scheduler = this.createScheduler(jobGraph);
        DefaultExecutionGraph graph = (DefaultExecutionGraph)scheduler.getExecutionGraph();
        ExecutionJobVertex sinkExecutionJobVertex = graph.getJobVertex(sink.getID());
        scheduler.startScheduling();
        Assertions.assertThat((int)sinkExecutionJobVertex.getParallelism()).isEqualTo(-1);
        AdaptiveBatchSchedulerTest.transitionExecutionsState(scheduler, ExecutionState.FINISHED, source1);
        Assertions.assertThat((int)sinkExecutionJobVertex.getParallelism()).isEqualTo(-1);
        AdaptiveBatchSchedulerTest.transitionExecutionsState(scheduler, ExecutionState.FINISHED, source2);
        Assertions.assertThat((int)sinkExecutionJobVertex.getParallelism()).isEqualTo(10);
        Assertions.assertThat((int)sink.getParallelism()).isEqualTo(10);
        this.checkAggregatedInputDataBytesIsCalculated(sinkExecutionJobVertex, 26000L);
    }

    @Test
    void testDecideParallelismForForwardTarget() throws Exception {
        JobVertex source = this.createJobVertex("source", 6);
        JobVertex map = this.createJobVertex("map", -1);
        JobVertex sink = this.createJobVertex("sink", -1);
        JobVertexConnectionUtils.connectNewDataSetAsInput(map, source, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        JobVertexConnectionUtils.connectNewDataSetAsInput(sink, map, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING, false, true);
        SchedulerBase scheduler = this.createScheduler(new JobGraph(new JobID(), "test job", new JobVertex[]{source, map, sink}), DefaultSchedulerBuilder.createCustomParallelismDecider(jobVertexId -> {
            if (jobVertexId.equals((Object)map.getID())) {
                return 5;
            }
            return 10;
        }), 128);
        DefaultExecutionGraph graph = (DefaultExecutionGraph)scheduler.getExecutionGraph();
        ExecutionJobVertex mapExecutionJobVertex = graph.getJobVertex(map.getID());
        ExecutionJobVertex sinkExecutionJobVertex = graph.getJobVertex(sink.getID());
        scheduler.startScheduling();
        Assertions.assertThat((int)mapExecutionJobVertex.getParallelism()).isEqualTo(-1);
        Assertions.assertThat((int)sinkExecutionJobVertex.getParallelism()).isEqualTo(-1);
        AdaptiveBatchSchedulerTest.transitionExecutionsState(scheduler, ExecutionState.FINISHED, source);
        Assertions.assertThat((int)mapExecutionJobVertex.getParallelism()).isEqualTo(5);
        Assertions.assertThat((int)sinkExecutionJobVertex.getParallelism()).isEqualTo(5);
        AdaptiveBatchSchedulerTest.transitionExecutionsState(scheduler, ExecutionState.FINISHED, map);
        Assertions.assertThat((int)mapExecutionJobVertex.getParallelism()).isEqualTo(5);
        Assertions.assertThat((int)sinkExecutionJobVertex.getParallelism()).isEqualTo(5);
        Assertions.assertThat((int)sink.getParallelism()).isEqualTo(5);
        this.checkAggregatedInputDataBytesIsCalculated(sinkExecutionJobVertex, 500L);
    }

    @Test
    void testUpdateBlockingResultInfoWhileScheduling() throws Exception {
        JobGraph jobGraph = this.createJobGraph();
        Iterator jobVertexIterator = jobGraph.getVertices().iterator();
        JobVertex source1 = (JobVertex)jobVertexIterator.next();
        JobVertex source2 = (JobVertex)jobVertexIterator.next();
        JobVertex sink = (JobVertex)jobVertexIterator.next();
        TestingJobMasterPartitionTracker partitionTracker = new TestingJobMasterPartitionTracker();
        partitionTracker.setIsPartitionTrackedFunction(ignore -> true);
        int maxParallelism = 6;
        AdaptiveBatchScheduler scheduler = new DefaultSchedulerBuilder(jobGraph, this.mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setDelayExecutor((ScheduledExecutor)this.taskRestartExecutor).setPartitionTracker(partitionTracker).setRestartBackoffTimeStrategy(new FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory(10, 0L).create()).setVertexParallelismAndInputInfosDecider(DefaultSchedulerBuilder.createCustomParallelismDecider(maxParallelism)).setDefaultMaxParallelism(maxParallelism).buildAdaptiveBatchJobScheduler();
        DefaultExecutionGraph graph = (DefaultExecutionGraph)scheduler.getExecutionGraph();
        ExecutionJobVertex source1ExecutionJobVertex = graph.getJobVertex(source1.getID());
        ExecutionJobVertex sinkExecutionJobVertex = graph.getJobVertex(sink.getID());
        scheduler.startScheduling();
        AdaptiveBatchSchedulerTest.transitionExecutionsState((SchedulerBase)scheduler, ExecutionState.FINISHED, source1);
        PointwiseBlockingResultInfo blockingResultInfo = (PointwiseBlockingResultInfo)this.getBlockingResultInfo(scheduler, source1);
        Assertions.assertThat((int)blockingResultInfo.getNumOfRecordedPartitions()).isEqualTo(6);
        AdaptiveBatchSchedulerTest.transitionExecutionsState((SchedulerBase)scheduler, ExecutionState.FINISHED, source2);
        blockingResultInfo = (PointwiseBlockingResultInfo)this.getBlockingResultInfo(scheduler, source2);
        Assertions.assertThat((int)blockingResultInfo.getNumOfRecordedPartitions()).isEqualTo(4);
        this.triggerFailedByPartitionNotFound((SchedulerBase)scheduler, source1ExecutionJobVertex.getTaskVertices()[0], sinkExecutionJobVertex.getTaskVertices()[0]);
        this.taskRestartExecutor.triggerScheduledTasks();
        Assertions.assertThat((int)((PointwiseBlockingResultInfo)this.getBlockingResultInfo(scheduler, source1)).getNumOfRecordedPartitions()).isEqualTo(5);
    }

    @Test
    void testConsumeOneResultTwice() throws Exception {
        JobVertex source = this.createJobVertex("source1", 1);
        JobVertex sink = this.createJobVertex("sink", -1);
        IntermediateDataSetID intermediateDataSetId = new IntermediateDataSetID();
        JobVertexConnectionUtils.connectNewDataSetAsInput(sink, source, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING, intermediateDataSetId, false);
        JobVertexConnectionUtils.connectNewDataSetAsInput(sink, source, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING, intermediateDataSetId, false);
        SchedulerBase scheduler = this.createScheduler(new JobGraph(new JobID(), "test job", new JobVertex[]{source, sink}), (VertexParallelismAndInputInfosDecider)DefaultVertexParallelismAndInputInfosDeciderTest.createDecider(1, 16, 400L), 16);
        DefaultExecutionGraph graph = (DefaultExecutionGraph)scheduler.getExecutionGraph();
        ExecutionJobVertex sinkExecutionJobVertex = graph.getJobVertex(sink.getID());
        scheduler.startScheduling();
        AdaptiveBatchSchedulerTest.transitionExecutionsState(scheduler, ExecutionState.FINISHED, source);
        Assertions.assertThat((int)sinkExecutionJobVertex.getParallelism()).isEqualTo(8);
        Assertions.assertThat((int)sink.getParallelism()).isEqualTo(8);
    }

    @Test
    void testParallelismDecidedVerticesCanBeInitializedEarlier() throws Exception {
        JobVertex source = this.createJobVertex("source", 8);
        JobVertex sink = this.createJobVertex("sink", 8);
        JobVertexConnectionUtils.connectNewDataSetAsInput(sink, source, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        SchedulerBase scheduler = this.createScheduler(new JobGraph(new JobID(), "test job", new JobVertex[]{source, sink}));
        DefaultExecutionGraph graph = (DefaultExecutionGraph)scheduler.getExecutionGraph();
        ExecutionJobVertex sinkExecutionJobVertex = graph.getJobVertex(sink.getID());
        scheduler.startScheduling();
        Assertions.assertThat((boolean)sinkExecutionJobVertex.isInitialized()).isTrue();
    }

    @Test
    void testUserConfiguredMaxParallelismIsLargerThanGlobalMaxParallelism() throws Exception {
        this.testUserConfiguredMaxParallelism(1, 32, 128, 1L, 32);
    }

    @Test
    void testUserConfiguredMaxParallelismIsSmallerThanGlobalMaxParallelism() throws Exception {
        this.testUserConfiguredMaxParallelism(1, 128, 32, 1L, 32);
    }

    @Test
    void testUserConfiguredMaxParallelismIsSmallerThanGlobalMinParallelism() throws Exception {
        this.testUserConfiguredMaxParallelism(16, 128, 8, 400L, 8);
    }

    @Test
    void testUserConfiguredMaxParallelismIsSmallerThanGlobalDefaultSourceParallelism() throws Exception {
        JobVertex source = this.createJobVertex("source", -1);
        source.setMaxParallelism(8);
        SchedulerBase scheduler = this.createScheduler(new JobGraph(new JobID(), "test job", new JobVertex[]{source}), (VertexParallelismAndInputInfosDecider)DefaultVertexParallelismAndInputInfosDeciderTest.createDecider(1, 128, 1L, 32), 128);
        scheduler.startScheduling();
        Assertions.assertThat((int)source.getParallelism()).isEqualTo(8);
    }

    @Test
    void testMergeDynamicParallelismFutures() {
        ArrayList<CompletableFuture<Integer>> sourceParallelismFutures = new ArrayList<CompletableFuture<Integer>>();
        CompletableFuture<Integer> sourceParallelismFuture = CompletableFuture.completedFuture(-1);
        sourceParallelismFutures.add(sourceParallelismFuture);
        CompletableFuture mergedSourceParallelismFuture = AdaptiveBatchScheduler.mergeDynamicParallelismFutures(sourceParallelismFutures);
        Assertions.assertThat((Integer)((Integer)mergedSourceParallelismFuture.join())).isEqualTo(-1);
        CompletableFuture<Integer> sourceParallelismFuture1 = CompletableFuture.completedFuture(1);
        CompletableFuture<Integer> sourceParallelismFuture2 = CompletableFuture.completedFuture(2);
        CompletableFuture<Integer> sourceParallelismFuture4 = CompletableFuture.completedFuture(4);
        sourceParallelismFutures.add(sourceParallelismFuture1);
        sourceParallelismFutures.add(sourceParallelismFuture2);
        sourceParallelismFutures.add(sourceParallelismFuture4);
        mergedSourceParallelismFuture = AdaptiveBatchScheduler.mergeDynamicParallelismFutures(sourceParallelismFutures);
        Assertions.assertThat((Integer)((Integer)mergedSourceParallelismFuture.join())).isEqualTo(4);
    }

    @Test
    void testCloseAsyncReturnsMainThreadFuture() throws Exception {
        ScheduledExecutorService scheduledExecutorServiceForMainThread = Executors.newSingleThreadScheduledExecutor();
        try {
            DefaultSchedulerTest.runCloseAsyncCompletesInMainThreadTest(scheduledExecutorServiceForMainThread, (BiFunctionWithException<ComponentMainThreadExecutor, CheckpointsCleaner, SchedulerNG, Exception>)((BiFunctionWithException)(mainThread, checkpointsCleaner) -> this.createSchedulerBuilder(this.createJobGraph(), (ComponentMainThreadExecutor)mainThread).setCheckpointCleaner((CheckpointsCleaner)checkpointsCleaner).buildAdaptiveBatchJobScheduler()));
        }
        finally {
            scheduledExecutorServiceForMainThread.shutdownNow();
        }
    }

    void testUserConfiguredMaxParallelism(int globalMinParallelism, int globalMaxParallelism, int userConfiguredMaxParallelism, long dataVolumePerTask, int expectedParallelism) throws Exception {
        JobVertex source = this.createJobVertex("source", 8);
        JobVertex sink = this.createJobVertex("sink", -1);
        sink.setMaxParallelism(userConfiguredMaxParallelism);
        JobVertexConnectionUtils.connectNewDataSetAsInput(sink, source, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        SchedulerBase scheduler = this.createScheduler(new JobGraph(new JobID(), "test job", new JobVertex[]{source, sink}), (VertexParallelismAndInputInfosDecider)DefaultVertexParallelismAndInputInfosDeciderTest.createDecider(globalMinParallelism, globalMaxParallelism, dataVolumePerTask), globalMaxParallelism);
        scheduler.startScheduling();
        AdaptiveBatchSchedulerTest.transitionExecutionsState(scheduler, ExecutionState.FINISHED, source);
        Assertions.assertThat((int)sink.getParallelism()).isEqualTo(expectedParallelism);
    }

    private BlockingResultInfo getBlockingResultInfo(AdaptiveBatchScheduler scheduler, JobVertex jobVertex) {
        return scheduler.getBlockingResultInfo(((IntermediateDataSet)Iterables.getOnlyElement((Iterable)jobVertex.getProducedDataSets())).getId());
    }

    private void checkAggregatedInputDataBytesIsCalculated(ExecutionJobVertex sinkExecutionJobVertex, long expectedTotalBytes) {
        ExecutionVertex[] executionVertices = sinkExecutionJobVertex.getTaskVertices();
        long totalInputBytes = 0L;
        for (ExecutionVertex ev : executionVertices) {
            long executionInputBytes = ev.getInputBytes();
            Assertions.assertThat((long)executionInputBytes).isNotEqualTo(-1L);
            totalInputBytes += executionInputBytes;
        }
        Assertions.assertThat((long)totalInputBytes).isEqualTo(expectedTotalBytes);
    }

    private void triggerFailedByPartitionNotFound(SchedulerBase scheduler, ExecutionVertex producerVertex, ExecutionVertex consumerVertex) {
        Execution execution = consumerVertex.getCurrentExecutionAttempt();
        IntermediateResultPartitionID partitionId = ((IntermediateResultPartition)Iterables.getOnlyElement(producerVertex.getProducedPartitions().values())).getPartitionId();
        AdaptiveBatchSchedulerTest.transitionExecutionsState(scheduler, ExecutionState.FAILED, Collections.singletonList(execution), (Throwable)new PartitionNotFoundException(new ResultPartitionID(partitionId, producerVertex.getCurrentExecutionAttempt().getAttemptId())));
    }

    public static void transitionExecutionsState(SchedulerBase scheduler, ExecutionState state, List<Execution> executions, @Nullable Throwable throwable) {
        for (Execution execution : executions) {
            TaskExecutionState taskExecutionState = state == ExecutionState.FINISHED ? SchedulerTestingUtils.createFinishedTaskExecutionState(execution.getAttemptId(), AdaptiveBatchSchedulerTest.createResultPartitionBytesForExecution(execution)) : (state == ExecutionState.FAILED ? SchedulerTestingUtils.createFailedTaskExecutionState(execution.getAttemptId(), throwable) : new TaskExecutionState(execution.getAttemptId(), state));
            scheduler.updateTaskExecutionState(taskExecutionState);
        }
    }

    static Map<IntermediateResultPartitionID, ResultPartitionBytes> createResultPartitionBytesForExecution(Execution execution) {
        HashMap<IntermediateResultPartitionID, ResultPartitionBytes> partitionBytes = new HashMap<IntermediateResultPartitionID, ResultPartitionBytes>();
        execution.getVertex().getProducedPartitions().forEach((partitionId, partition) -> {
            int numOfSubpartitions = partition.getNumberOfSubpartitions();
            partitionBytes.put((IntermediateResultPartitionID)partitionId, new ResultPartitionBytes(LongStream.range(0L, numOfSubpartitions).boxed().mapToLong(ignored -> 100L).toArray()));
        });
        return partitionBytes;
    }

    public static void transitionExecutionsState(SchedulerBase scheduler, ExecutionState state, JobVertex jobVertex) {
        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
        List<Execution> executions = Arrays.asList(executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()).stream().map(ExecutionVertex::getCurrentExecutionAttempt).collect(Collectors.toList());
        AdaptiveBatchSchedulerTest.transitionExecutionsState(scheduler, state, executions, null);
    }

    public JobVertex createJobVertex(String jobVertexName, int parallelism) {
        JobVertex jobVertex = new JobVertex(jobVertexName);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        if (parallelism > 0) {
            jobVertex.setParallelism(parallelism);
        }
        return jobVertex;
    }

    public JobGraph createJobGraph() {
        return this.createJobGraph(false);
    }

    private JobGraph createBrokenJobGraph() {
        return this.createJobGraph(true);
    }

    public JobGraph createJobGraph(boolean broken) {
        JobVertex source1 = this.createJobVertex("source1", 6);
        JobVertex source2 = this.createJobVertex("source2", 4);
        JobVertex sink = this.createJobVertex("sink", -1);
        IntermediateDataSetID sharedDataSetId = new IntermediateDataSetID();
        JobVertexConnectionUtils.connectNewDataSetAsInput(sink, source1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING, broken ? sharedDataSetId : new IntermediateDataSetID(), false);
        JobVertexConnectionUtils.connectNewDataSetAsInput(sink, source2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING, broken ? sharedDataSetId : new IntermediateDataSetID(), false);
        return new JobGraph(new JobID(), "test job", new JobVertex[]{source1, source2, sink});
    }

    private SchedulerBase createScheduler(JobGraph jobGraph) throws Exception {
        return this.createScheduler(jobGraph, DefaultSchedulerBuilder.createCustomParallelismDecider(10), (Integer)BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM.defaultValue());
    }

    private SchedulerBase createScheduler(JobGraph jobGraph, VertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider, int defaultMaxParallelism) throws Exception {
        return this.createSchedulerBuilder(jobGraph).setVertexParallelismAndInputInfosDecider(vertexParallelismAndInputInfosDecider).setDefaultMaxParallelism(defaultMaxParallelism).buildAdaptiveBatchJobScheduler();
    }

    private SchedulerBase createScheduler(JobGraph jobGraph, Collection<FailureEnricher> failureEnrichers, RestartBackoffTimeStrategy strategy) throws Exception {
        return this.createSchedulerBuilder(jobGraph).setRestartBackoffTimeStrategy(strategy).setFailureEnrichers(failureEnrichers).buildAdaptiveBatchJobScheduler();
    }

    private DefaultSchedulerBuilder createSchedulerBuilder(JobGraph jobGraph) {
        return this.createSchedulerBuilder(jobGraph, this.mainThreadExecutor);
    }

    private DefaultSchedulerBuilder createSchedulerBuilder(JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor) {
        return new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setDelayExecutor((ScheduledExecutor)this.taskRestartExecutor);
    }
}

