/*
 * Decompiled with CFR 0.152.
 */
package org.apache.twill.internal.zookeeper;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.hive.com.google.common.base.Supplier;
import org.apache.hive.com.google.common.util.concurrent.FutureCallback;
import org.apache.hive.com.google.common.util.concurrent.Futures;
import org.apache.hive.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hive.org.apache.zookeeper.CreateMode;
import org.apache.hive.org.apache.zookeeper.Watcher;
import org.apache.hive.org.apache.zookeeper.data.ACL;
import org.apache.hive.org.apache.zookeeper.data.Stat;
import org.apache.twill.common.Threads;
import org.apache.twill.internal.zookeeper.RetryUtils;
import org.apache.twill.internal.zookeeper.SettableOperationFuture;
import org.apache.twill.zookeeper.ACLData;
import org.apache.twill.zookeeper.ForwardingZKClient;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.OperationFuture;
import org.apache.twill.zookeeper.RetryStrategy;
import org.apache.twill.zookeeper.ZKClient;

public final class FailureRetryZKClient
extends ForwardingZKClient {
    private static final ScheduledExecutorService SCHEDULER = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("retry-zkclient"));
    private final RetryStrategy retryStrategy;

    public FailureRetryZKClient(ZKClient delegate, RetryStrategy retryStrategy) {
        super(delegate);
        this.retryStrategy = retryStrategy;
    }

    @Override
    public OperationFuture<String> create(final String path, final @Nullable byte[] data, final CreateMode createMode, final boolean createParent, final Iterable<ACL> acl) {
        if (createMode == CreateMode.PERSISTENT_SEQUENTIAL || createMode == CreateMode.EPHEMERAL_SEQUENTIAL) {
            return super.create(path, data, createMode, createParent, acl);
        }
        SettableOperationFuture<String> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
        Futures.addCallback(super.create(path, data, createMode, createParent, acl), new OperationFutureCallback(RetryStrategy.OperationType.CREATE, System.currentTimeMillis(), path, result, new Supplier<OperationFuture<String>>(){

            @Override
            public OperationFuture<String> get() {
                return FailureRetryZKClient.super.create(path, data, createMode, createParent, acl);
            }
        }));
        return result;
    }

    @Override
    public OperationFuture<Stat> exists(final String path, final Watcher watcher) {
        SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
        Futures.addCallback(super.exists(path, watcher), new OperationFutureCallback(RetryStrategy.OperationType.EXISTS, System.currentTimeMillis(), path, result, new Supplier<OperationFuture<Stat>>(){

            @Override
            public OperationFuture<Stat> get() {
                return FailureRetryZKClient.super.exists(path, watcher);
            }
        }));
        return result;
    }

    @Override
    public OperationFuture<NodeChildren> getChildren(final String path, final Watcher watcher) {
        SettableOperationFuture<NodeChildren> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
        Futures.addCallback(super.getChildren(path, watcher), new OperationFutureCallback(RetryStrategy.OperationType.GET_CHILDREN, System.currentTimeMillis(), path, result, new Supplier<OperationFuture<NodeChildren>>(){

            @Override
            public OperationFuture<NodeChildren> get() {
                return FailureRetryZKClient.super.getChildren(path, watcher);
            }
        }));
        return result;
    }

    @Override
    public OperationFuture<NodeData> getData(final String path, final Watcher watcher) {
        SettableOperationFuture<NodeData> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
        Futures.addCallback(super.getData(path, watcher), new OperationFutureCallback(RetryStrategy.OperationType.GET_DATA, System.currentTimeMillis(), path, result, new Supplier<OperationFuture<NodeData>>(){

            @Override
            public OperationFuture<NodeData> get() {
                return FailureRetryZKClient.super.getData(path, watcher);
            }
        }));
        return result;
    }

    @Override
    public OperationFuture<Stat> setData(final String dataPath, final byte[] data, final int version) {
        SettableOperationFuture<Stat> result = SettableOperationFuture.create(dataPath, Threads.SAME_THREAD_EXECUTOR);
        Futures.addCallback(super.setData(dataPath, data, version), new OperationFutureCallback(RetryStrategy.OperationType.SET_DATA, System.currentTimeMillis(), dataPath, result, new Supplier<OperationFuture<Stat>>(){

            @Override
            public OperationFuture<Stat> get() {
                return FailureRetryZKClient.super.setData(dataPath, data, version);
            }
        }));
        return result;
    }

    @Override
    public OperationFuture<String> delete(final String deletePath, final int version) {
        SettableOperationFuture<String> result = SettableOperationFuture.create(deletePath, Threads.SAME_THREAD_EXECUTOR);
        Futures.addCallback(super.delete(deletePath, version), new OperationFutureCallback(RetryStrategy.OperationType.DELETE, System.currentTimeMillis(), deletePath, result, new Supplier<OperationFuture<String>>(){

            @Override
            public OperationFuture<String> get() {
                return FailureRetryZKClient.super.delete(deletePath, version);
            }
        }));
        return result;
    }

    @Override
    public OperationFuture<ACLData> getACL(final String path) {
        SettableOperationFuture<ACLData> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
        Futures.addCallback(super.getACL(path), new OperationFutureCallback(RetryStrategy.OperationType.GET_ACL, System.currentTimeMillis(), path, result, new Supplier<OperationFuture<ACLData>>(){

            @Override
            public OperationFuture<ACLData> get() {
                return FailureRetryZKClient.super.getACL(path);
            }
        }));
        return result;
    }

    @Override
    public OperationFuture<Stat> setACL(final String path, final Iterable<ACL> acl, final int version) {
        SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, Threads.SAME_THREAD_EXECUTOR);
        Futures.addCallback(super.setACL(path, acl, version), new OperationFutureCallback(RetryStrategy.OperationType.SET_ACL, System.currentTimeMillis(), path, result, new Supplier<OperationFuture<Stat>>(){

            @Override
            public OperationFuture<Stat> get() {
                return FailureRetryZKClient.super.setACL(path, acl, version);
            }
        }));
        return result;
    }

    private final class OperationFutureCallback<V>
    implements FutureCallback<V> {
        private final RetryStrategy.OperationType type;
        private final long startTime;
        private final String path;
        private final SettableOperationFuture<V> result;
        private final Supplier<OperationFuture<V>> retryAction;
        private final AtomicInteger failureCount;

        private OperationFutureCallback(RetryStrategy.OperationType type, long startTime, String path, SettableOperationFuture<V> result, Supplier<OperationFuture<V>> retryAction) {
            this.type = type;
            this.startTime = startTime;
            this.path = path;
            this.result = result;
            this.retryAction = retryAction;
            this.failureCount = new AtomicInteger(0);
        }

        @Override
        public void onSuccess(V result) {
            this.result.set(result);
        }

        @Override
        public void onFailure(Throwable t) {
            if (!this.doRetry(t)) {
                this.result.setException(t);
            }
        }

        private boolean doRetry(Throwable t) {
            if (!RetryUtils.canRetry(t)) {
                return false;
            }
            long nextRetry = FailureRetryZKClient.this.retryStrategy.nextRetry(this.failureCount.incrementAndGet(), this.startTime, this.type, this.path);
            if (nextRetry < 0L) {
                return false;
            }
            SCHEDULER.schedule(new Runnable(){

                @Override
                public void run() {
                    Futures.addCallback((ListenableFuture)OperationFutureCallback.this.retryAction.get(), OperationFutureCallback.this);
                }
            }, nextRetry, TimeUnit.MILLISECONDS);
            return true;
        }
    }
}

