/*
 * Decompiled with CFR 0.152.
 */
package org.apache.twill.internal.kafka.client;

import com.google.gson.Gson;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hive.com.google.common.base.Charsets;
import org.apache.hive.com.google.common.base.Function;
import org.apache.hive.com.google.common.base.Joiner;
import org.apache.hive.com.google.common.base.Preconditions;
import org.apache.hive.com.google.common.base.Supplier;
import org.apache.hive.com.google.common.base.Suppliers;
import org.apache.hive.com.google.common.base.Throwables;
import org.apache.hive.com.google.common.cache.CacheBuilder;
import org.apache.hive.com.google.common.cache.CacheLoader;
import org.apache.hive.com.google.common.cache.LoadingCache;
import org.apache.hive.com.google.common.collect.ImmutableList;
import org.apache.hive.com.google.common.collect.Iterables;
import org.apache.hive.com.google.common.collect.Sets;
import org.apache.hive.com.google.common.primitives.Ints;
import org.apache.hive.com.google.common.util.concurrent.AbstractIdleService;
import org.apache.hive.com.google.common.util.concurrent.FutureCallback;
import org.apache.hive.com.google.common.util.concurrent.Futures;
import org.apache.hive.com.google.common.util.concurrent.SettableFuture;
import org.apache.hive.org.apache.zookeeper.KeeperException;
import org.apache.hive.org.apache.zookeeper.WatchedEvent;
import org.apache.hive.org.apache.zookeeper.Watcher;
import org.apache.hive.org.apache.zookeeper.data.Stat;
import org.apache.hive.org.slf4j.Logger;
import org.apache.hive.org.slf4j.LoggerFactory;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.kafka.client.BrokerInfo;
import org.apache.twill.kafka.client.BrokerService;
import org.apache.twill.kafka.client.TopicPartition;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.ZKClient;

public final class ZKBrokerService
extends AbstractIdleService
implements BrokerService {
    private static final Logger LOG = LoggerFactory.getLogger(ZKBrokerService.class);
    private static final String BROKER_IDS_PATH = "/brokers/ids";
    private static final String BROKER_TOPICS_PATH = "/brokers/topics";
    private static final long FAILURE_RETRY_SECONDS = 5L;
    private static final Gson GSON = new Gson();
    private static final Function<String, BrokerId> BROKER_ID_TRANSFORMER = new Function<String, BrokerId>(){

        @Override
        public BrokerId apply(String input) {
            return new BrokerId(Integer.parseInt(input));
        }
    };
    private static final Function<BrokerInfo, String> BROKER_INFO_TO_ADDRESS = new Function<BrokerInfo, String>(){

        @Override
        public String apply(BrokerInfo input) {
            return String.format("%s:%d", input.getHost(), input.getPort());
        }
    };
    private final ZKClient zkClient;
    private final LoadingCache<BrokerId, Supplier<BrokerInfo>> brokerInfos;
    private final LoadingCache<KeyPathTopicPartition, Supplier<PartitionInfo>> partitionInfos;
    private final Set<ListenerExecutor> listeners;
    private ExecutorService executorService;
    private Supplier<Iterable<BrokerInfo>> brokerList;

    public ZKBrokerService(ZKClient zkClient) {
        this.zkClient = zkClient;
        this.brokerInfos = CacheBuilder.newBuilder().build(this.createCacheLoader(new CacheInvalidater<BrokerId>(){

            @Override
            public void invalidate(BrokerId key) {
                ZKBrokerService.this.brokerInfos.invalidate(key);
            }
        }, BrokerInfo.class));
        this.partitionInfos = CacheBuilder.newBuilder().build(this.createCacheLoader(new CacheInvalidater<KeyPathTopicPartition>(){

            @Override
            public void invalidate(KeyPathTopicPartition key) {
                ZKBrokerService.this.partitionInfos.invalidate(key);
            }
        }, PartitionInfo.class));
        this.listeners = Sets.newCopyOnWriteArraySet();
    }

    @Override
    protected void startUp() throws Exception {
        this.executorService = Executors.newCachedThreadPool(Threads.createDaemonThreadFactory("zk-kafka-broker"));
    }

    @Override
    protected void shutDown() throws Exception {
        this.executorService.shutdownNow();
    }

    @Override
    public BrokerInfo getLeader(String topic, int partition) {
        Preconditions.checkState(this.isRunning(), "BrokerService is not running.");
        PartitionInfo partitionInfo = this.partitionInfos.getUnchecked(new KeyPathTopicPartition(topic, partition)).get();
        return partitionInfo == null ? null : this.brokerInfos.getUnchecked(new BrokerId(partitionInfo.getLeader())).get();
    }

    @Override
    public synchronized Iterable<BrokerInfo> getBrokers() {
        Preconditions.checkState(this.isRunning(), "BrokerService is not running.");
        if (this.brokerList != null) {
            return this.brokerList.get();
        }
        final SettableFuture readerFuture = SettableFuture.create();
        final AtomicReference brokers = new AtomicReference(ImmutableList.of());
        this.actOnExists(BROKER_IDS_PATH, new Runnable(){

            @Override
            public void run() {
                final FutureCallback<NodeChildren> childrenCallback = new FutureCallback<NodeChildren>(){

                    @Override
                    public void onSuccess(NodeChildren result) {
                        try {
                            brokers.set(ImmutableList.copyOf(Iterables.transform(ZKBrokerService.this.brokerInfos.getAll(Iterables.transform(result.getChildren(), BROKER_ID_TRANSFORMER)).values(), Suppliers.supplierFunction())));
                            readerFuture.set(null);
                            for (ListenerExecutor listener : ZKBrokerService.this.listeners) {
                                listener.changed(ZKBrokerService.this);
                            }
                        }
                        catch (ExecutionException e) {
                            readerFuture.setException(e.getCause());
                        }
                    }

                    @Override
                    public void onFailure(Throwable t) {
                        readerFuture.setException(t);
                    }
                };
                Futures.addCallback(ZKBrokerService.this.zkClient.getChildren(ZKBrokerService.BROKER_IDS_PATH, new Watcher(){

                    @Override
                    public void process(WatchedEvent event) {
                        if (!ZKBrokerService.this.isRunning()) {
                            return;
                        }
                        if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
                            Futures.addCallback(ZKBrokerService.this.zkClient.getChildren(ZKBrokerService.BROKER_IDS_PATH, this), childrenCallback, ZKBrokerService.this.executorService);
                        }
                    }
                }), childrenCallback, ZKBrokerService.this.executorService);
            }
        }, readerFuture, 5L, TimeUnit.SECONDS);
        this.brokerList = this.createSupplier(brokers);
        try {
            readerFuture.get();
        }
        catch (Exception e) {
            throw Throwables.propagate(e);
        }
        return this.brokerList.get();
    }

    @Override
    public String getBrokerList() {
        return Joiner.on(',').join(Iterables.transform(this.getBrokers(), BROKER_INFO_TO_ADDRESS));
    }

    @Override
    public Cancellable addChangeListener(BrokerService.BrokerChangeListener listener, Executor executor) {
        final ListenerExecutor listenerExecutor = new ListenerExecutor(listener, executor);
        this.listeners.add(listenerExecutor);
        return new Cancellable(){

            @Override
            public void cancel() {
                ZKBrokerService.this.listeners.remove(listenerExecutor);
            }
        };
    }

    private <K extends KeyPath, T> CacheLoader<K, Supplier<T>> createCacheLoader(final CacheInvalidater<K> invalidater, final Class<T> resultType) {
        return new CacheLoader<K, Supplier<T>>(){

            @Override
            public Supplier<T> load(K key) throws Exception {
                final SettableFuture readyFuture = SettableFuture.create();
                final AtomicReference resultValue = new AtomicReference();
                final String path = key.getPath();
                ZKBrokerService.this.actOnExists(path, new Runnable((KeyPath)key){
                    final /* synthetic */ KeyPath val$key;
                    {
                        this.val$key = keyPath;
                    }

                    @Override
                    public void run() {
                        final FutureCallback<NodeData> dataCallback = new FutureCallback<NodeData>(){

                            @Override
                            public void onSuccess(NodeData result) {
                                Object value = ZKBrokerService.this.decodeNodeData(result, resultType);
                                resultValue.set(value);
                                readyFuture.set(value);
                            }

                            @Override
                            public void onFailure(Throwable t) {
                                LOG.error("Failed to fetch node data on {}", (Object)path, (Object)t);
                                if (t instanceof KeeperException.NoNodeException) {
                                    resultValue.set(null);
                                    readyFuture.set(null);
                                    return;
                                }
                                invalidater.invalidate(val$key);
                                readyFuture.setException(t);
                            }
                        };
                        Futures.addCallback(ZKBrokerService.this.zkClient.getData(path, new Watcher(){

                            @Override
                            public void process(WatchedEvent event) {
                                if (!ZKBrokerService.this.isRunning()) {
                                    return;
                                }
                                if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
                                    Futures.addCallback(ZKBrokerService.this.zkClient.getData(path, this), dataCallback, ZKBrokerService.this.executorService);
                                } else if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
                                    ZKBrokerService.this.brokerInfos.invalidate(val$key);
                                }
                            }
                        }), dataCallback, ZKBrokerService.this.executorService);
                    }
                }, readyFuture, 5L, TimeUnit.SECONDS);
                readyFuture.get();
                return ZKBrokerService.this.createSupplier(resultValue);
            }
        };
    }

    private <T> T decodeNodeData(NodeData nodeData, Class<T> type) {
        byte[] data;
        byte[] byArray = data = nodeData == null ? null : nodeData.getData();
        if (data == null) {
            return null;
        }
        return (T)GSON.fromJson(new String(data, Charsets.UTF_8), type);
    }

    private void actOnExists(final String path, final Runnable action, final SettableFuture<?> readyFuture, final long retryTime, final TimeUnit retryUnit) {
        Futures.addCallback(this.zkClient.exists(path, new Watcher(){

            @Override
            public void process(WatchedEvent event) {
                if (!ZKBrokerService.this.isRunning()) {
                    return;
                }
                if (event.getType() == Watcher.Event.EventType.NodeCreated) {
                    action.run();
                }
            }
        }), new FutureCallback<Stat>(){

            @Override
            public void onSuccess(Stat result) {
                if (result != null) {
                    action.run();
                } else {
                    readyFuture.set(null);
                }
            }

            @Override
            public void onFailure(Throwable t) {
                Thread retryThread = new Thread("zk-broker-service-retry"){

                    @Override
                    public void run() {
                        try {
                            retryUnit.sleep(retryTime);
                            ZKBrokerService.this.actOnExists(path, action, readyFuture, retryTime, retryUnit);
                        }
                        catch (InterruptedException e) {
                            LOG.warn("ZK retry thread interrupted. Action not retried.");
                        }
                    }
                };
                retryThread.setDaemon(true);
                retryThread.start();
            }
        }, this.executorService);
    }

    private <T> Supplier<T> createSupplier(final AtomicReference<T> ref) {
        return new Supplier<T>(){

            @Override
            public T get() {
                return ref.get();
            }
        };
    }

    private static final class ListenerExecutor
    extends BrokerService.BrokerChangeListener {
        private final BrokerService.BrokerChangeListener listener;
        private final Executor executor;

        private ListenerExecutor(BrokerService.BrokerChangeListener listener, Executor executor) {
            this.listener = listener;
            this.executor = executor;
        }

        @Override
        public void changed(final BrokerService brokerService) {
            try {
                this.executor.execute(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            ListenerExecutor.this.listener.changed(brokerService);
                        }
                        catch (Throwable t) {
                            LOG.error("Failure when calling BrokerChangeListener.", t);
                        }
                    }
                });
            }
            catch (Throwable t) {
                LOG.error("Failure when calling BrokerChangeListener.", t);
            }
        }
    }

    private static final class PartitionInfo {
        private int[] isr;
        private int leader;

        private PartitionInfo() {
        }

        private int[] getIsr() {
            return this.isr;
        }

        private int getLeader() {
            return this.leader;
        }
    }

    private static final class KeyPathTopicPartition
    extends TopicPartition
    implements KeyPath {
        private KeyPathTopicPartition(String topic, int partition) {
            super(topic, partition);
        }

        @Override
        public String getPath() {
            return String.format("%s/%s/partitions/%d/state", ZKBrokerService.BROKER_TOPICS_PATH, this.getTopic(), this.getPartition());
        }
    }

    private static final class BrokerId
    implements KeyPath {
        private final int id;

        private BrokerId(int id) {
            this.id = id;
        }

        public boolean equals(Object o) {
            return this == o || o != null && this.getClass() == o.getClass() && this.id == ((BrokerId)o).id;
        }

        public int hashCode() {
            return Ints.hashCode(this.id);
        }

        @Override
        public String getPath() {
            return "/brokers/ids/" + this.id;
        }
    }

    private static interface KeyPath {
        public String getPath();
    }

    private static interface CacheInvalidater<T> {
        public void invalidate(T var1);
    }
}

