/*
 * Decompiled with CFR 0.152.
 */
package org.apache.twill.internal.zookeeper;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.hive.com.google.common.base.Preconditions;
import org.apache.hive.com.google.common.base.Supplier;
import org.apache.hive.com.google.common.collect.ArrayListMultimap;
import org.apache.hive.com.google.common.collect.ImmutableList;
import org.apache.hive.com.google.common.collect.ImmutableMultimap;
import org.apache.hive.com.google.common.collect.Multimap;
import org.apache.hive.com.google.common.util.concurrent.AbstractService;
import org.apache.hive.com.google.common.util.concurrent.FutureCallback;
import org.apache.hive.com.google.common.util.concurrent.Futures;
import org.apache.hive.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hive.com.google.common.util.concurrent.Service;
import org.apache.hive.org.apache.zookeeper.AsyncCallback;
import org.apache.hive.org.apache.zookeeper.CreateMode;
import org.apache.hive.org.apache.zookeeper.KeeperException;
import org.apache.hive.org.apache.zookeeper.WatchedEvent;
import org.apache.hive.org.apache.zookeeper.Watcher;
import org.apache.hive.org.apache.zookeeper.ZooDefs;
import org.apache.hive.org.apache.zookeeper.ZooKeeper;
import org.apache.hive.org.apache.zookeeper.data.ACL;
import org.apache.hive.org.apache.zookeeper.data.Stat;
import org.apache.hive.org.slf4j.Logger;
import org.apache.hive.org.slf4j.LoggerFactory;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.internal.zookeeper.BasicACLData;
import org.apache.twill.internal.zookeeper.BasicNodeChildren;
import org.apache.twill.internal.zookeeper.BasicNodeData;
import org.apache.twill.internal.zookeeper.SettableOperationFuture;
import org.apache.twill.zookeeper.ACLData;
import org.apache.twill.zookeeper.AbstractZKClient;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.OperationFuture;
import org.apache.twill.zookeeper.ZKClientService;

public final class DefaultZKClientService
extends AbstractZKClient
implements ZKClientService {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultZKClientService.class);
    private final String zkStr;
    private final int sessionTimeout;
    private final List<Watcher> connectionWatchers;
    private final Multimap<String, byte[]> authInfos;
    private final AtomicReference<ZooKeeper> zooKeeper;
    private final Service serviceDelegate;
    private ExecutorService eventExecutor;

    @Deprecated
    public DefaultZKClientService(String zkStr, int sessionTimeout, Watcher connectionWatcher) {
        this(zkStr, sessionTimeout, connectionWatcher, ImmutableMultimap.of());
    }

    public DefaultZKClientService(String zkStr, int sessionTimeout, Watcher connectionWatcher, Multimap<String, byte[]> authInfos) {
        this.zkStr = zkStr;
        this.sessionTimeout = sessionTimeout;
        this.connectionWatchers = new CopyOnWriteArrayList<Watcher>();
        this.authInfos = this.copyAuthInfo(authInfos);
        this.addConnectionWatcher(connectionWatcher);
        this.zooKeeper = new AtomicReference();
        this.serviceDelegate = new ServiceDelegate();
    }

    @Override
    public Long getSessionId() {
        ZooKeeper zk = this.zooKeeper.get();
        return zk == null ? null : Long.valueOf(zk.getSessionId());
    }

    @Override
    public String getConnectString() {
        return this.zkStr;
    }

    @Override
    public Cancellable addConnectionWatcher(final Watcher watcher) {
        if (watcher == null) {
            return new Cancellable(){

                @Override
                public void cancel() {
                }
            };
        }
        this.connectionWatchers.add(watcher);
        return new Cancellable(){

            @Override
            public void cancel() {
                DefaultZKClientService.this.connectionWatchers.remove(watcher);
            }
        };
    }

    @Override
    public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode, boolean createParent, Iterable<ACL> acl) {
        return this.doCreate(path, data, createMode, createParent, ImmutableList.copyOf(acl), false);
    }

    private OperationFuture<String> doCreate(final String path, final @Nullable byte[] data, final CreateMode createMode, boolean createParent, final List<ACL> acl, final boolean ignoreNodeExists) {
        SettableOperationFuture<String> createFuture = SettableOperationFuture.create(path, this.eventExecutor);
        this.getZooKeeper().create(path, data, acl, createMode, Callbacks.STRING, createFuture);
        if (!createParent) {
            return createFuture;
        }
        final SettableOperationFuture<String> result = SettableOperationFuture.create(path, this.eventExecutor);
        Futures.addCallback(createFuture, new FutureCallback<String>(){

            @Override
            public void onSuccess(String path2) {
                result.set(path2);
            }

            @Override
            public void onFailure(Throwable t) {
                if (this.updateFailureResult(t, result, path, ignoreNodeExists)) {
                    return;
                }
                String parentPath = this.getParent(path);
                if (parentPath.isEmpty()) {
                    result.setException(t);
                    return;
                }
                Futures.addCallback(DefaultZKClientService.this.doCreate(parentPath, null, CreateMode.PERSISTENT, true, ZooDefs.Ids.OPEN_ACL_UNSAFE, true), new FutureCallback<String>(){

                    @Override
                    public void onSuccess(String parentPath) {
                        Futures.addCallback(DefaultZKClientService.this.doCreate(path, data, createMode, false, acl, ignoreNodeExists), new FutureCallback<String>(){

                            @Override
                            public void onSuccess(String pathResult) {
                                result.set(pathResult);
                            }

                            @Override
                            public void onFailure(Throwable t) {
                                this.updateFailureResult(t, result, path, ignoreNodeExists);
                            }
                        });
                    }

                    @Override
                    public void onFailure(Throwable t) {
                        result.setException(t);
                    }
                });
            }

            private boolean updateFailureResult(Throwable t, SettableOperationFuture<String> result2, String path2, boolean ignoreNodeExists2) {
                if (!(t instanceof KeeperException)) {
                    result2.setException(t);
                    return true;
                }
                KeeperException.Code code = ((KeeperException)t).code();
                if (ignoreNodeExists2 && code == KeeperException.Code.NODEEXISTS) {
                    result2.set(path2);
                    return false;
                }
                if (code != KeeperException.Code.NONODE) {
                    result2.setException(t);
                    return true;
                }
                return false;
            }

            private String getParent(String path2) {
                String parentPath = path2.substring(0, path2.lastIndexOf(47));
                return parentPath.isEmpty() && !"/".equals(path2) ? "/" : parentPath;
            }
        });
        return result;
    }

    @Override
    public OperationFuture<Stat> exists(String path, Watcher watcher) {
        SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, this.eventExecutor);
        this.getZooKeeper().exists(path, this.wrapWatcher(watcher), Callbacks.STAT_NONODE, result);
        return result;
    }

    @Override
    public OperationFuture<NodeChildren> getChildren(String path, Watcher watcher) {
        SettableOperationFuture<NodeChildren> result = SettableOperationFuture.create(path, this.eventExecutor);
        this.getZooKeeper().getChildren(path, this.wrapWatcher(watcher), Callbacks.CHILDREN, result);
        return result;
    }

    @Override
    public OperationFuture<NodeData> getData(String path, Watcher watcher) {
        SettableOperationFuture<NodeData> result = SettableOperationFuture.create(path, this.eventExecutor);
        this.getZooKeeper().getData(path, this.wrapWatcher(watcher), Callbacks.DATA, result);
        return result;
    }

    @Override
    public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
        SettableOperationFuture<Stat> result = SettableOperationFuture.create(dataPath, this.eventExecutor);
        this.getZooKeeper().setData(dataPath, data, version, Callbacks.STAT, result);
        return result;
    }

    @Override
    public OperationFuture<String> delete(String deletePath, int version) {
        SettableOperationFuture<String> result = SettableOperationFuture.create(deletePath, this.eventExecutor);
        this.getZooKeeper().delete(deletePath, version, Callbacks.VOID, result);
        return result;
    }

    @Override
    public OperationFuture<ACLData> getACL(String path) {
        SettableOperationFuture<ACLData> result = SettableOperationFuture.create(path, this.eventExecutor);
        this.getZooKeeper().getACL(path, new Stat(), Callbacks.ACL, result);
        return result;
    }

    @Override
    public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl, int version) {
        SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, this.eventExecutor);
        this.getZooKeeper().setACL(path, ImmutableList.copyOf(acl), version, Callbacks.STAT, result);
        return result;
    }

    @Override
    public Supplier<ZooKeeper> getZooKeeperSupplier() {
        return new Supplier<ZooKeeper>(){

            @Override
            public ZooKeeper get() {
                return DefaultZKClientService.this.getZooKeeper();
            }
        };
    }

    @Override
    public ListenableFuture<Service.State> start() {
        return this.serviceDelegate.start();
    }

    @Override
    public Service.State startAndWait() {
        return this.serviceDelegate.startAndWait();
    }

    @Override
    public boolean isRunning() {
        return this.serviceDelegate.isRunning();
    }

    @Override
    public Service.State state() {
        return this.serviceDelegate.state();
    }

    @Override
    public ListenableFuture<Service.State> stop() {
        return this.serviceDelegate.stop();
    }

    @Override
    public Service.State stopAndWait() {
        return this.serviceDelegate.stopAndWait();
    }

    @Override
    public void addListener(Service.Listener listener, Executor executor) {
        this.serviceDelegate.addListener(listener, executor);
    }

    private ZooKeeper getZooKeeper() {
        ZooKeeper zk = this.zooKeeper.get();
        Preconditions.checkArgument(zk != null, "Not connected to zooKeeper.");
        return zk;
    }

    private Watcher wrapWatcher(final Watcher watcher) {
        if (watcher == null) {
            return null;
        }
        return new Watcher(){

            @Override
            public void process(final WatchedEvent event) {
                if (DefaultZKClientService.this.eventExecutor.isShutdown()) {
                    LOG.debug("Already shutdown. Discarding event: {}", (Object)event);
                    return;
                }
                DefaultZKClientService.this.eventExecutor.execute(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            watcher.process(event);
                        }
                        catch (Throwable t) {
                            LOG.error("Watcher throws exception.", t);
                        }
                    }
                });
            }
        };
    }

    private Multimap<String, byte[]> copyAuthInfo(Multimap<String, byte[]> authInfos) {
        ArrayListMultimap<String, byte[]> result = ArrayListMultimap.create();
        for (Map.Entry<String, byte[]> entry : authInfos.entries()) {
            byte[] info = entry.getValue();
            result.put(entry.getKey(), info == null ? null : Arrays.copyOf(info, info.length));
        }
        return result;
    }

    private static final class Callbacks {
        static final AsyncCallback.StringCallback STRING = new AsyncCallback.StringCallback(){

            @Override
            public void processResult(int rc, String path, Object ctx, String name) {
                SettableOperationFuture result = (SettableOperationFuture)ctx;
                KeeperException.Code code = KeeperException.Code.get(rc);
                if (code == KeeperException.Code.OK) {
                    result.set(name == null || name.isEmpty() ? path : name);
                    return;
                }
                result.setException(KeeperException.create(code, result.getRequestPath()));
            }
        };
        static final AsyncCallback.StatCallback STAT = new AsyncCallback.StatCallback(){

            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat) {
                SettableOperationFuture result = (SettableOperationFuture)ctx;
                KeeperException.Code code = KeeperException.Code.get(rc);
                if (code == KeeperException.Code.OK) {
                    result.set(stat);
                    return;
                }
                result.setException(KeeperException.create(code, result.getRequestPath()));
            }
        };
        static final AsyncCallback.StatCallback STAT_NONODE = new AsyncCallback.StatCallback(){

            @Override
            public void processResult(int rc, String path, Object ctx, Stat stat) {
                SettableOperationFuture result = (SettableOperationFuture)ctx;
                KeeperException.Code code = KeeperException.Code.get(rc);
                if (code == KeeperException.Code.OK || code == KeeperException.Code.NONODE) {
                    result.set(stat);
                    return;
                }
                result.setException(KeeperException.create(code, result.getRequestPath()));
            }
        };
        static final AsyncCallback.Children2Callback CHILDREN = new AsyncCallback.Children2Callback(){

            @Override
            public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
                SettableOperationFuture result = (SettableOperationFuture)ctx;
                KeeperException.Code code = KeeperException.Code.get(rc);
                if (code == KeeperException.Code.OK) {
                    result.set(new BasicNodeChildren(children, stat));
                    return;
                }
                result.setException(KeeperException.create(code, result.getRequestPath()));
            }
        };
        static final AsyncCallback.DataCallback DATA = new AsyncCallback.DataCallback(){

            @Override
            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                SettableOperationFuture result = (SettableOperationFuture)ctx;
                KeeperException.Code code = KeeperException.Code.get(rc);
                if (code == KeeperException.Code.OK) {
                    result.set(new BasicNodeData(data, stat));
                    return;
                }
                result.setException(KeeperException.create(code, result.getRequestPath()));
            }
        };
        static final AsyncCallback.VoidCallback VOID = new AsyncCallback.VoidCallback(){

            @Override
            public void processResult(int rc, String path, Object ctx) {
                SettableOperationFuture result = (SettableOperationFuture)ctx;
                KeeperException.Code code = KeeperException.Code.get(rc);
                if (code == KeeperException.Code.OK) {
                    result.set(result.getRequestPath());
                    return;
                }
                result.setException(KeeperException.create(code, result.getRequestPath()));
            }
        };
        static final AsyncCallback.ACLCallback ACL = new AsyncCallback.ACLCallback(){

            @Override
            public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) {
                SettableOperationFuture result = (SettableOperationFuture)ctx;
                KeeperException.Code code = KeeperException.Code.get(rc);
                if (code == KeeperException.Code.OK) {
                    result.set(new BasicACLData(acl, stat));
                    return;
                }
                result.setException(KeeperException.create(code, result.getRequestPath()));
            }
        };

        private Callbacks() {
        }
    }

    private final class ServiceDelegate
    extends AbstractService
    implements Watcher {
        private final Runnable stopTask = this.createStopTask();

        private ServiceDelegate() {
            this.addListener(new Service.Listener(){

                @Override
                public void starting() {
                }

                @Override
                public void running() {
                }

                @Override
                public void stopping(Service.State from) {
                    if (from == Service.State.STARTING) {
                        ServiceDelegate.this.notifyStarted();
                    }
                }

                @Override
                public void terminated(Service.State from) {
                }

                @Override
                public void failed(Service.State from, Throwable failure) {
                    DefaultZKClientService.this.eventExecutor.shutdownNow();
                    ServiceDelegate.this.closeZooKeeper(DefaultZKClientService.this.zooKeeper.getAndSet(null));
                }
            }, Threads.SAME_THREAD_EXECUTOR);
        }

        @Override
        protected void doStart() {
            ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Threads.createDaemonThreadFactory("zk-client-EventThread"));
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
            DefaultZKClientService.this.eventExecutor = executor;
            try {
                DefaultZKClientService.this.zooKeeper.set(this.createZooKeeper());
            }
            catch (IOException e) {
                this.notifyFailed(e);
            }
        }

        @Override
        protected void doStop() {
            DefaultZKClientService.this.eventExecutor.submit(this.stopTask);
            DefaultZKClientService.this.eventExecutor.shutdown();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void process(WatchedEvent event) {
            Service.State state = this.state();
            if (state == Service.State.TERMINATED || state == Service.State.FAILED) {
                return;
            }
            try {
                if (event.getState() == Watcher.Event.KeeperState.SyncConnected && state == Service.State.STARTING) {
                    LOG.debug("Connected to ZooKeeper: {}", (Object)DefaultZKClientService.this.zkStr);
                    this.notifyStarted();
                    return;
                }
                if (event.getState() == Watcher.Event.KeeperState.Expired) {
                    LOG.info("ZooKeeper session expired: {}", (Object)DefaultZKClientService.this.zkStr);
                    if (state != Service.State.RUNNING) {
                        return;
                    }
                    DefaultZKClientService.this.eventExecutor.submit(new Runnable(){

                        @Override
                        public void run() {
                            if (ServiceDelegate.this.state() != Service.State.RUNNING) {
                                return;
                            }
                            try {
                                LOG.info("Reconnect to ZooKeeper due to expiration: {}", (Object)DefaultZKClientService.this.zkStr);
                                ServiceDelegate.this.closeZooKeeper(DefaultZKClientService.this.zooKeeper.getAndSet(ServiceDelegate.this.createZooKeeper()));
                            }
                            catch (IOException e) {
                                ServiceDelegate.this.notifyFailed(e);
                            }
                        }
                    });
                }
            }
            finally {
                if (event.getType() == Watcher.Event.EventType.None) {
                    for (Watcher connectionWatcher : DefaultZKClientService.this.connectionWatchers) {
                        connectionWatcher.process(event);
                    }
                }
            }
        }

        private Runnable createStopTask() {
            return new Runnable(){

                @Override
                public void run() {
                    try {
                        ServiceDelegate.this.closeZooKeeper(DefaultZKClientService.this.zooKeeper.getAndSet(null));
                        ServiceDelegate.this.notifyStopped();
                    }
                    catch (Exception e) {
                        ServiceDelegate.this.notifyFailed(e);
                    }
                }
            };
        }

        private ZooKeeper createZooKeeper() throws IOException {
            ZooKeeper zk = new ZooKeeper(DefaultZKClientService.this.zkStr, DefaultZKClientService.this.sessionTimeout, DefaultZKClientService.this.wrapWatcher(this));
            for (Map.Entry authInfo : DefaultZKClientService.this.authInfos.entries()) {
                zk.addAuthInfo((String)authInfo.getKey(), (byte[])authInfo.getValue());
            }
            return zk;
        }

        private void closeZooKeeper(@Nullable ZooKeeper zk) {
            try {
                if (zk != null) {
                    zk.close();
                }
            }
            catch (InterruptedException e) {
                LOG.warn("Interrupted when closing ZooKeeper", e);
                Thread.currentThread().interrupt();
            }
        }
    }
}

