/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import java.io.Serializable;
import java.nio.file.Path;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.testutils.CustomExtension;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmanager.ExecutionPlanStore;
import org.apache.flink.runtime.jobmanager.ExecutionPlanStoreWatcher;
import org.apache.flink.runtime.jobmanager.TestingExecutionPlanListener;
import org.apache.flink.runtime.jobmanager.ZooKeeperExecutionPlanStoreWatcher;
import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper;
import org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper;
import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

class ZooKeeperExecutionPlanStoreWatcherTest {
    @RegisterExtension
    public EachCallbackWrapper<ZooKeeperExtension> zooKeeperExtensionWrapper = new EachCallbackWrapper((CustomExtension)new ZooKeeperExtension());
    @TempDir
    public Path temporaryFolder;
    private Configuration configuration;
    private TestingExecutionPlanListener testingExecutionPlanListener;

    ZooKeeperExecutionPlanStoreWatcherTest() {
    }

    @BeforeEach
    void setup() throws Exception {
        this.configuration = new Configuration();
        this.configuration.set(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, (Object)((ZooKeeperExtension)this.zooKeeperExtensionWrapper.getCustomExtension()).getConnectString());
        this.configuration.set(HighAvailabilityOptions.HA_STORAGE_PATH, (Object)TempDirUtils.newFolder((Path)this.temporaryFolder).getAbsolutePath());
        this.testingExecutionPlanListener = new TestingExecutionPlanListener();
    }

    @Test
    void testJobGraphAddedAndRemovedShouldNotifyGraphStoreListener() throws Exception {
        try (CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework((Configuration)this.configuration, (FatalErrorHandler)NoOpFatalErrorHandler.INSTANCE);){
            CuratorFramework client = curatorFrameworkWrapper.asCuratorFramework();
            ExecutionPlanStoreWatcher executionPlanStoreWatcher = this.createAndStartExecutionPlanStoreWatcher(client);
            ZooKeeperStateHandleStore<JobGraph> stateHandleStore = this.createStateHandleStore(client);
            JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();
            JobID jobID = jobGraph.getJobID();
            stateHandleStore.addAndLock("/" + String.valueOf(jobID), (Serializable)jobGraph);
            CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> this.testingExecutionPlanListener.getAddedExecutionPlans().size() > 0));
            Assertions.assertThat(this.testingExecutionPlanListener.getAddedExecutionPlans()).containsExactly((Object[])new JobID[]{jobID});
            stateHandleStore.releaseAndTryRemove("/" + String.valueOf(jobID));
            CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> this.testingExecutionPlanListener.getRemovedExecutionPlans().size() > 0));
            Assertions.assertThat(this.testingExecutionPlanListener.getRemovedExecutionPlans()).containsExactly((Object[])new JobID[]{jobID});
            executionPlanStoreWatcher.stop();
        }
    }

    private ExecutionPlanStoreWatcher createAndStartExecutionPlanStoreWatcher(CuratorFramework client) throws Exception {
        ZooKeeperExecutionPlanStoreWatcher executionPlanStoreWatcher = new ZooKeeperExecutionPlanStoreWatcher(new PathChildrenCache(client, "/", false));
        executionPlanStoreWatcher.start((ExecutionPlanStore.ExecutionPlanListener)this.testingExecutionPlanListener);
        return executionPlanStoreWatcher;
    }

    private ZooKeeperStateHandleStore<JobGraph> createStateHandleStore(CuratorFramework client) throws Exception {
        FileSystemStateStorageHelper stateStorage = ZooKeeperUtils.createFileSystemStateStorage((Configuration)this.configuration, (String)"test_jobgraph");
        return new ZooKeeperStateHandleStore(client, (RetrievableStateStorageHelper)stateStorage);
    }
}

