/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hive.org.apache.commons.lang.StringUtils;
import org.apache.hive.org.apache.commons.logging.Log;
import org.apache.hive.org.apache.commons.logging.LogFactory;

@InterfaceAudience.Private
public class HBaseInterClusterReplicationEndpoint
extends HBaseReplicationEndpoint {
    private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class);
    private HConnection conn;
    private Configuration conf;
    private long sleepForRetries;
    private int maxRetriesMultiplier;
    private int socketTimeoutMultiplier;
    private MetricsSource metrics;
    private ReplicationSinkManager replicationSinkMgr;
    private boolean peersSelected = false;

    @Override
    public void init(ReplicationEndpoint.Context context) throws IOException {
        super.init(context);
        this.conf = HBaseConfiguration.create(this.ctx.getConfiguration());
        this.decorateConf();
        this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
        this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", this.maxRetriesMultiplier);
        this.conn = HConnectionManager.createConnection(this.conf);
        this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000L);
        this.metrics = context.getMetrics();
        this.replicationSinkMgr = new ReplicationSinkManager(this.conn, this.ctx.getPeerId(), this, this.conf);
    }

    private void decorateConf() {
        String replicationCodec = this.conf.get("hbase.replication.rpc.codec");
        if (StringUtils.isNotEmpty(replicationCodec)) {
            this.conf.set("hbase.client.rpc.codec", replicationCodec);
        }
    }

    private void connectToPeers() {
        this.getRegionServers();
        int sleepMultiplier = 1;
        while (this.isRunning() && this.replicationSinkMgr.getSinks().size() == 0) {
            this.replicationSinkMgr.chooseSinks();
            if (!this.isRunning() || this.replicationSinkMgr.getSinks().size() != 0 || !this.sleepForRetries("Waiting for peers", sleepMultiplier)) continue;
            ++sleepMultiplier;
        }
    }

    protected boolean sleepForRetries(String msg, int sleepMultiplier) {
        try {
            if (LOG.isTraceEnabled()) {
                LOG.trace(msg + ", sleeping " + this.sleepForRetries + " times " + sleepMultiplier);
            }
            Thread.sleep(this.sleepForRetries * (long)sleepMultiplier);
        }
        catch (InterruptedException e) {
            LOG.debug("Interrupted while sleeping between retries");
        }
        return sleepMultiplier < this.maxRetriesMultiplier;
    }

    @Override
    public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
        List<WAL.Entry> entries = replicateContext.getEntries();
        int sleepMultiplier = 1;
        while (this.isRunning()) {
            if (!this.peersSelected) {
                this.connectToPeers();
                this.peersSelected = true;
            }
            if (!this.isPeerEnabled()) {
                if (!this.sleepForRetries("Replication is disabled", sleepMultiplier)) continue;
                ++sleepMultiplier;
                continue;
            }
            ReplicationSinkManager.SinkPeer sinkPeer = null;
            try {
                sinkPeer = this.replicationSinkMgr.getReplicationSink();
                AdminProtos.AdminService.BlockingInterface rrs = sinkPeer.getRegionServer();
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Replicating " + entries.size() + " entries of total size " + replicateContext.getSize());
                }
                ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new WAL.Entry[entries.size()]));
                this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime());
                this.replicationSinkMgr.reportSinkSuccess(sinkPeer);
                return true;
            }
            catch (IOException ioe) {
                this.metrics.refreshAgeOfLastShippedOp();
                if (ioe instanceof RemoteException) {
                    ioe = ((RemoteException)ioe).unwrapRemoteException();
                    LOG.warn("Can't replicate because of an error on the remote cluster: ", ioe);
                    if (ioe instanceof TableNotFoundException && this.sleepForRetries("A table is missing in the peer cluster. Replication cannot proceed without losing data.", sleepMultiplier)) {
                        ++sleepMultiplier;
                    }
                } else if (ioe instanceof SocketTimeoutException) {
                    this.sleepForRetries("Encountered a SocketTimeoutException. Since the call to the remote cluster timed out, which is usually caused by a machine failure or a massive slowdown", this.socketTimeoutMultiplier);
                } else if (ioe instanceof ConnectException) {
                    LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
                    this.replicationSinkMgr.chooseSinks();
                } else {
                    LOG.warn("Can't replicate because of a local or network error: ", ioe);
                }
                if (sinkPeer != null) {
                    this.replicationSinkMgr.reportBadSink(sinkPeer);
                }
                if (!this.sleepForRetries("Since we are unable to replicate", sleepMultiplier)) continue;
                ++sleepMultiplier;
            }
        }
        return false;
    }

    protected boolean isPeerEnabled() {
        return this.ctx.getReplicationPeer().getPeerState() == ReplicationPeer.PeerState.ENABLED;
    }

    @Override
    protected void doStop() {
        this.disconnect();
        if (this.conn != null) {
            try {
                this.conn.close();
                this.conn = null;
            }
            catch (IOException e) {
                LOG.warn("Failed to close the connection");
            }
        }
        this.notifyStopped();
    }
}

