/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.leaderelection.TestingLeaderElection;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl;
import org.apache.flink.runtime.resourcemanager.TestingResourceManagerFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.runtime.security.token.NoOpDelegationTokenManager;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.util.Sets;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class ResourceManagerServiceImplTest {
    private static final HeartbeatServices heartbeatServices = new TestingHeartbeatServices();
    private static final DelegationTokenManager delegationTokenManager = new NoOpDelegationTokenManager();
    private static final ClusterInformation clusterInformation = new ClusterInformation("localhost", 1234);
    private static final MetricRegistry metricRegistry = TestingMetricRegistry.builder().build();
    private static TestingRpcService rpcService;
    private static TestingHighAvailabilityServices haService;
    private static TestingFatalErrorHandler fatalErrorHandler;
    private TestingResourceManagerFactory.Builder rmFactoryBuilder;
    private TestingLeaderElection leaderElection;
    private ResourceManagerServiceImpl resourceManagerService;

    ResourceManagerServiceImplTest() {
    }

    @BeforeAll
    static void setupClass() {
        rpcService = new TestingRpcService();
        haService = new TestingHighAvailabilityServices();
        fatalErrorHandler = new TestingFatalErrorHandler();
    }

    @BeforeEach
    void setup() {
        fatalErrorHandler.clearError();
        this.rmFactoryBuilder = new TestingResourceManagerFactory.Builder();
        this.leaderElection = new TestingLeaderElection();
        haService.setResourceManagerLeaderElection(this.leaderElection);
    }

    @AfterEach
    void teardown() throws Exception {
        this.leaderElection.close();
        if (this.resourceManagerService != null) {
            this.resourceManagerService.close();
        }
        if (fatalErrorHandler.hasExceptionOccurred()) {
            fatalErrorHandler.rethrowError();
        }
    }

    @AfterAll
    static void teardownClass() throws Exception {
        if (rpcService != null) {
            RpcUtils.terminateRpcService((RpcService[])new RpcService[]{rpcService});
        }
    }

    private void createAndStartResourceManager() throws Exception {
        this.createResourceManager();
        this.resourceManagerService.start();
    }

    private void createResourceManager() throws Exception {
        TestingResourceManagerFactory rmFactory = this.rmFactoryBuilder.build();
        this.resourceManagerService = ResourceManagerServiceImpl.create((ResourceManagerFactory)rmFactory, (Configuration)new Configuration(), (ResourceID)ResourceID.generate(), (RpcService)rpcService, (HighAvailabilityServices)haService, (HeartbeatServices)heartbeatServices, (DelegationTokenManager)delegationTokenManager, (FatalErrorHandler)fatalErrorHandler, (ClusterInformation)clusterInformation, null, (MetricRegistry)metricRegistry, (String)"localhost", (Executor)ForkJoinPool.commonPool());
    }

    @Test
    void grantLeadership_startRmAndConfirmLeaderSession() throws Exception {
        UUID leaderSessionId = UUID.randomUUID();
        CompletableFuture startRmFuture = new CompletableFuture();
        this.rmFactoryBuilder.setInitializeConsumer(startRmFuture::complete);
        this.createAndStartResourceManager();
        CompletableFuture<LeaderInformation> confirmedLeaderInformation = this.leaderElection.isLeader(leaderSessionId);
        FlinkAssertions.assertThatFuture(startRmFuture).eventuallySucceeds().isSameAs((Object)leaderSessionId);
        Assertions.assertThat((Comparable)confirmedLeaderInformation.get().getLeaderSessionID()).isSameAs((Object)leaderSessionId);
    }

    @Test
    void grantLeadership_confirmLeaderSessionAfterRmStarted() throws Exception {
        UUID leaderSessionId = UUID.randomUUID();
        CompletableFuture<Object> finishRmInitializationFuture = new CompletableFuture<Object>();
        this.rmFactoryBuilder.setInitializeConsumer(ignore -> ResourceManagerServiceImplTest.blockOnFuture(finishRmInitializationFuture));
        this.createAndStartResourceManager();
        CompletableFuture<LeaderInformation> confirmedLeaderInformation = this.leaderElection.isLeader(leaderSessionId);
        ResourceManagerServiceImplTest.assertNotComplete(confirmedLeaderInformation);
        finishRmInitializationFuture.complete(null);
        Assertions.assertThat((Comparable)confirmedLeaderInformation.get().getLeaderSessionID()).isSameAs((Object)leaderSessionId);
    }

    @Test
    void grantLeadership_withExistingLeader_stopExistLeader() throws Exception {
        UUID leaderSessionId1 = UUID.randomUUID();
        UUID leaderSessionId2 = UUID.randomUUID();
        CompletableFuture startRmFuture1 = new CompletableFuture();
        CompletableFuture startRmFuture2 = new CompletableFuture();
        CompletableFuture terminateRmFuture = new CompletableFuture();
        this.rmFactoryBuilder.setInitializeConsumer(uuid -> {
            if (!startRmFuture1.isDone()) {
                startRmFuture1.complete(uuid);
            } else {
                startRmFuture2.complete(uuid);
            }
        }).setTerminateConsumer(terminateRmFuture::complete);
        this.createAndStartResourceManager();
        this.leaderElection.isLeader(leaderSessionId1).join();
        CompletableFuture<LeaderInformation> confirmedLeaderInformation = this.leaderElection.isLeader(leaderSessionId2);
        FlinkAssertions.assertThatFuture(terminateRmFuture).eventuallySucceeds().isSameAs((Object)leaderSessionId1);
        FlinkAssertions.assertThatFuture(startRmFuture2).eventuallySucceeds().isSameAs((Object)leaderSessionId2);
        Assertions.assertThat((Comparable)confirmedLeaderInformation.get().getLeaderSessionID()).isSameAs((Object)leaderSessionId2);
    }

    @Test
    void grantLeadership_withExistingLeader_waitTerminationOfExistingLeader() throws Exception {
        UUID leaderSessionId1 = UUID.randomUUID();
        UUID leaderSessionId2 = UUID.randomUUID();
        CompletableFuture startRmFuture1 = new CompletableFuture();
        CompletableFuture startRmFuture2 = new CompletableFuture();
        CompletableFuture<Object> finishRmTerminationFuture = new CompletableFuture<Object>();
        this.rmFactoryBuilder.setInitializeConsumer(uuid -> {
            if (!startRmFuture1.isDone()) {
                startRmFuture1.complete(uuid);
            } else {
                startRmFuture2.complete(uuid);
            }
        }).setTerminateConsumer(ignore -> ResourceManagerServiceImplTest.blockOnFuture(finishRmTerminationFuture));
        this.createAndStartResourceManager();
        this.leaderElection.isLeader(leaderSessionId1).join();
        CompletableFuture<LeaderInformation> confirmedLeaderInformation = this.leaderElection.isLeader(leaderSessionId2);
        ResourceManagerServiceImplTest.assertNotComplete(startRmFuture2);
        finishRmTerminationFuture.complete(null);
        FlinkAssertions.assertThatFuture(startRmFuture2).eventuallySucceeds().isSameAs((Object)leaderSessionId2);
        Assertions.assertThat((Comparable)confirmedLeaderInformation.get().getLeaderSessionID()).isSameAs((Object)leaderSessionId2);
    }

    @Test
    void grantLeadership_notStarted_doesNotStartNewRm() throws Exception {
        CompletableFuture startRmFuture = new CompletableFuture();
        this.rmFactoryBuilder.setInitializeConsumer(startRmFuture::complete);
        this.createResourceManager();
        CompletableFuture<LeaderInformation> confirmedLeaderInformation = this.leaderElection.isLeader(UUID.randomUUID());
        ResourceManagerServiceImplTest.assertNotComplete(startRmFuture);
        ResourceManagerServiceImplTest.assertNotComplete(confirmedLeaderInformation);
    }

    @Test
    void grantLeadership_stopped_doesNotStartNewRm() throws Exception {
        CompletableFuture startRmFuture = new CompletableFuture();
        this.rmFactoryBuilder.setInitializeConsumer(startRmFuture::complete);
        this.createAndStartResourceManager();
        this.resourceManagerService.close();
        CompletableFuture<LeaderInformation> confirmedLeaderInformation = this.leaderElection.isLeader(UUID.randomUUID());
        ResourceManagerServiceImplTest.assertNotComplete(startRmFuture);
        ResourceManagerServiceImplTest.assertNotComplete(confirmedLeaderInformation);
    }

    @Test
    void revokeLeadership_stopExistLeader() throws Exception {
        UUID leaderSessionId = UUID.randomUUID();
        CompletableFuture terminateRmFuture = new CompletableFuture();
        this.rmFactoryBuilder.setTerminateConsumer(terminateRmFuture::complete);
        this.createAndStartResourceManager();
        this.leaderElection.isLeader(leaderSessionId).join();
        this.leaderElection.notLeader();
        FlinkAssertions.assertThatFuture(terminateRmFuture).eventuallySucceeds().isSameAs((Object)leaderSessionId);
    }

    @Test
    void revokeLeadership_terminateService_multiLeaderSessionNotSupported() throws Exception {
        this.rmFactoryBuilder.setSupportMultiLeaderSession(false);
        this.createAndStartResourceManager();
        this.leaderElection.isLeader(UUID.randomUUID()).join();
        this.leaderElection.notLeader();
        this.resourceManagerService.getTerminationFuture().get();
    }

    @Test
    void leaderRmTerminated_terminateService() throws Exception {
        UUID leaderSessionId = UUID.randomUUID();
        CompletableFuture<Object> rmTerminationFuture = new CompletableFuture<Object>();
        this.rmFactoryBuilder.setGetTerminationFutureFunction((ignore1, ignore2) -> rmTerminationFuture);
        this.createAndStartResourceManager();
        this.leaderElection.isLeader(leaderSessionId).join();
        rmTerminationFuture.complete(null);
        this.resourceManagerService.getTerminationFuture().get();
    }

    @Test
    void nonLeaderRmTerminated_doseNotTerminateService() throws Exception {
        UUID leaderSessionId = UUID.randomUUID();
        CompletableFuture terminateRmFuture = new CompletableFuture();
        CompletableFuture<Object> rmTerminationFuture = new CompletableFuture<Object>();
        this.rmFactoryBuilder.setTerminateConsumer(terminateRmFuture::complete).setGetTerminationFutureFunction((ignore1, ignore2) -> rmTerminationFuture);
        this.createAndStartResourceManager();
        this.leaderElection.isLeader(leaderSessionId).join();
        this.leaderElection.notLeader();
        FlinkAssertions.assertThatFuture(terminateRmFuture).eventuallySucceeds().isSameAs((Object)leaderSessionId);
        rmTerminationFuture.complete(null);
        ResourceManagerServiceImplTest.assertNotComplete(this.resourceManagerService.getTerminationFuture());
    }

    @Test
    void closeService_stopRmAndLeaderElection() throws Exception {
        CompletableFuture terminateRmFuture = new CompletableFuture();
        this.rmFactoryBuilder.setTerminateConsumer(terminateRmFuture::complete);
        this.createAndStartResourceManager();
        this.leaderElection.isLeader(UUID.randomUUID()).join();
        Assertions.assertThat((boolean)this.leaderElection.isStopped()).isFalse();
        this.resourceManagerService.close();
        FlinkAssertions.assertThatFuture(terminateRmFuture).isDone();
        Assertions.assertThat((boolean)this.leaderElection.isStopped()).isTrue();
    }

    @Test
    void closeService_futureCompleteAfterRmTerminated() throws Exception {
        CompletableFuture<Object> finishRmTerminationFuture = new CompletableFuture<Object>();
        this.rmFactoryBuilder.setTerminateConsumer(ignore -> ResourceManagerServiceImplTest.blockOnFuture(finishRmTerminationFuture));
        this.createAndStartResourceManager();
        this.leaderElection.isLeader(UUID.randomUUID()).join();
        CompletableFuture closeServiceFuture = this.resourceManagerService.closeAsync();
        ResourceManagerServiceImplTest.assertNotComplete(closeServiceFuture);
        finishRmTerminationFuture.complete(null);
        closeServiceFuture.get();
    }

    @Test
    void deregisterApplication_leaderRmNotStarted() throws Exception {
        CompletableFuture startRmInitializationFuture = new CompletableFuture();
        CompletableFuture<Object> finishRmInitializationFuture = new CompletableFuture<Object>();
        this.rmFactoryBuilder.setInitializeConsumer(ignore -> {
            startRmInitializationFuture.complete(null);
            ResourceManagerServiceImplTest.blockOnFuture(finishRmInitializationFuture);
        });
        this.createAndStartResourceManager();
        this.leaderElection.isLeader(UUID.randomUUID());
        startRmInitializationFuture.get();
        CompletableFuture deregisterApplicationFuture = this.resourceManagerService.deregisterApplication(ApplicationStatus.CANCELED, null);
        ResourceManagerServiceImplTest.assertNotComplete(deregisterApplicationFuture);
        finishRmInitializationFuture.complete(null);
        FlinkAssertions.assertThatFuture((CompletableFuture)deregisterApplicationFuture).eventuallySucceeds();
    }

    @Test
    void deregisterApplication_noLeaderRm() throws Exception {
        this.createAndStartResourceManager();
        CompletableFuture deregisterApplicationFuture = this.resourceManagerService.deregisterApplication(ApplicationStatus.CANCELED, null);
        FlinkAssertions.assertThatFuture((CompletableFuture)deregisterApplicationFuture).eventuallySucceeds();
    }

    @Test
    void grantAndRevokeLeadership_verifyMetrics() throws Exception {
        Set registeredMetrics = Collections.newSetFromMap(new ConcurrentHashMap());
        TestingMetricRegistry metricRegistry = TestingMetricRegistry.builder().setRegisterConsumer((a, b, c) -> registeredMetrics.add(b)).setUnregisterConsumer((a, b, c) -> registeredMetrics.remove(b)).build();
        TestingResourceManagerFactory rmFactory = this.rmFactoryBuilder.build();
        this.resourceManagerService = ResourceManagerServiceImpl.create((ResourceManagerFactory)rmFactory, (Configuration)new Configuration(), (ResourceID)ResourceID.generate(), (RpcService)rpcService, (HighAvailabilityServices)haService, (HeartbeatServices)heartbeatServices, (DelegationTokenManager)delegationTokenManager, (FatalErrorHandler)fatalErrorHandler, (ClusterInformation)clusterInformation, null, (MetricRegistry)metricRegistry, (String)"localhost", (Executor)ForkJoinPool.commonPool());
        this.resourceManagerService.start();
        Assertions.assertThat(registeredMetrics).isEmpty();
        this.leaderElection.isLeader(UUID.randomUUID()).join();
        Set expectedMetrics = Sets.set((Object[])new String[]{"numRegisteredTaskManagers", "taskSlotsTotal", "taskSlotsAvailable"});
        ((AbstractCollectionAssert)Assertions.assertThat(registeredMetrics).as("Expected RM to register leader metrics", new Object[0])).containsAll((Iterable)expectedMetrics);
        this.revokeLeadership();
        HashSet intersection = new HashSet(registeredMetrics);
        intersection.retainAll(expectedMetrics);
        ((AbstractCollectionAssert)Assertions.assertThat(intersection).as("Expected RM to unregister leader metrics", new Object[0])).isEmpty();
        this.leaderElection.isLeader(UUID.randomUUID()).join();
        ((AbstractCollectionAssert)Assertions.assertThat(registeredMetrics).as("Expected RM to re-register leader metrics", new Object[0])).containsAll((Iterable)expectedMetrics);
    }

    private static void blockOnFuture(CompletableFuture<?> future) {
        try {
            future.get();
        }
        catch (Exception e) {
            e.printStackTrace();
            org.junit.jupiter.api.Assertions.fail();
        }
    }

    private static void assertNotComplete(CompletableFuture<?> future) {
        FlinkAssertions.assertThatFuture(future).failsWithin(50L, TimeUnit.MILLISECONDS).withThrowableOfType(TimeoutException.class);
    }

    private void revokeLeadership() {
        ResourceManager leaderResourceManager = this.resourceManagerService.getLeaderResourceManager();
        this.leaderElection.notLeader();
        ResourceManagerServiceImplTest.blockOnFuture(leaderResourceManager.getTerminationFuture());
    }
}

