/*
 * Decompiled with CFR 0.152.
 */
package com.neeve.service.cdc;

import com.eaio.uuid.UUID;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import com.neeve.aep.AepFlow;
import com.neeve.aep.AepFlowFactory;
import com.neeve.aep.AepInboundSnoTable;
import com.neeve.ci.XRuntime;
import com.neeve.ods.IStoreBinding;
import com.neeve.ods.IStoreObject;
import com.neeve.ods.IStorePersister;
import com.neeve.ods.OdsExpectationNotMetException;
import com.neeve.ods.StoreCommitEntry;
import com.neeve.ods.StorePersisterDescriptor;
import com.neeve.pkt.log.EPktLogCorruptException;
import com.neeve.rog.IRogChangeDataCaptureHandler;
import com.neeve.rog.IRogNode;
import com.neeve.rog.log.RogLog;
import com.neeve.rog.log.RogLogCdcProcessor;
import com.neeve.rog.log.RogLogMetadata;
import com.neeve.service.Alerter;
import com.neeve.service.cdc.DbPersister;
import com.neeve.service.cdc.ICdcTracer;
import com.neeve.service.cdc.IDbReaderCallback;
import com.neeve.trace.Tracer;
import com.neeve.util.UtlThrowable;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.influxdb.dto.Point;

public abstract class Runner
implements Runnable {
    private final boolean _enabled;
    private final boolean _disableOnFail;
    private final DbPersister _dbPersister;
    private final Alerter _alerter;
    private final String _logName;
    private final String _tracePrefix;
    private final Tracer _tracer;
    private ScheduledThreadPoolExecutor _heartbeatExecutor;
    private RogLog _log;
    private RogLogCdcProcessor _cdcProcessor;
    private CDCHandler _cdcHandler;
    private Thread _thread;
    private State _state;
    private OpeningState _openingState;
    private volatile Throwable _stopCause;
    private int _cdcRestartedCount = 0;
    @Inject
    @Named(value="nv.service.cdc.restartfrequency")
    protected long _cdcRestartFrequency;
    @Inject
    @Named(value="nv.service.cdc.restartattempts")
    protected int _cdcRestartAttempts;

    protected Runner(Boolean enabled, Boolean disableOnFail, int pollAlertFrequency, String logName, DbPersister dbPersister, Alerter alerter, ICdcTracer tracer) throws Exception {
        this._enabled = enabled;
        this._disableOnFail = disableOnFail;
        this._dbPersister = dbPersister;
        this._alerter = alerter;
        this._logName = logName;
        this._heartbeatExecutor = this.createHeartbeatExecutor();
        this._tracePrefix = "[CDC Runner (" + this._logName + ")] ";
        this._tracer = tracer.getTracer();
        this._state = State.Init;
        this._tracer.log(this._tracePrefix + "Initialized successfully {", Tracer.Level.INFO);
        this._tracer.log(this._tracePrefix + "...enabled=" + this._enabled, Tracer.Level.INFO);
        this._tracer.log(this._tracePrefix + "...disableOnFail=" + this._disableOnFail, Tracer.Level.INFO);
        this._tracer.log(this._tracePrefix + "}", Tracer.Level.INFO);
    }

    private final ScheduledThreadPoolExecutor createHeartbeatExecutor() {
        ScheduledThreadPoolExecutor heartbeatExecutor = new ScheduledThreadPoolExecutor(1, new HeartbeatExecutorThreadFactory());
        heartbeatExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        return heartbeatExecutor;
    }

    private final IllegalStateException prepareStateValidationErrorException() {
        switch (this._state) {
            case Init: {
                return new IllegalStateException("CDC runner has not been opened");
            }
            case Opening: {
                return new IllegalStateException("CDC runner is opening");
            }
            case Opened: {
                return new IllegalStateException("CDC runner is open");
            }
            case OpenFailed: {
                return new IllegalStateException("CDC runner open failed");
            }
            case Started: {
                return new IllegalStateException("CDC runner is started");
            }
            case Stopping: {
                return new IllegalStateException("CDC runner is stopping");
            }
            case Stopped: {
                return new IllegalStateException("CDC runner has stopped");
            }
            case Closed: {
                return new IllegalStateException("CDC runner is closed");
            }
        }
        throw new InternalError("unknown state '" + (Object)((Object)this._state) + "'!!");
    }

    private final boolean openTransactionLog(OpenTransactionLogFor forWhat) throws Exception {
        StorePersisterDescriptor persisterDescriptor = StorePersisterDescriptor.load((String)this._logName);
        Properties props = new Properties();
        props.setProperty("initialLogLength", persisterDescriptor.getProperty("initialLogLength"));
        props.setProperty("detachedPersist", "false");
        props.setProperty("cdcEnabled", persisterDescriptor.getProperty("cdcEnabled") != null ? persisterDescriptor.getProperty("cdcEnabled") : "false");
        props.setProperty("autoRepair", persisterDescriptor.getProperty("autoRepair") != null ? persisterDescriptor.getProperty("autoRepair") : "false");
        props.setProperty("storeRoot", persisterDescriptor.getProperty("storeRoot") != null ? persisterDescriptor.getProperty("storeRoot") : XRuntime.getDataDirectory());
        switch (forWhat) {
            case Cdc: {
                props.setProperty("logMode", "r");
                break;
            }
            case LogMaterialization: 
            case LogPresenceCheck: {
                props.setProperty("logMode", "rw");
                break;
            }
            default: {
                throw new InternalError("unknown transaction log open reason '" + (Object)((Object)forWhat) + "'");
            }
        }
        this._log = RogLog.create((String)this._logName, (Properties)props);
        switch (forWhat) {
            case Cdc: 
            case LogPresenceCheck: {
                this._log.setStartupExpectation(IStorePersister.StartupExpectation.LogPresence, (Enum)IStorePersister.LogPresenceExpectation.Present);
                break;
            }
            case LogMaterialization: {
                this._log.setStartupExpectation(IStorePersister.StartupExpectation.LogPresence, (Enum)IStorePersister.LogPresenceExpectation.Absent);
                break;
            }
            default: {
                throw new InternalError("unknown transaction log open reason '" + (Object)((Object)forWhat) + "'");
            }
        }
        try {
            this._log.open();
            return true;
        }
        catch (OdsExpectationNotMetException e) {
            try {
                this._log.close();
                this._log = null;
            }
            catch (Throwable e1) {
                this._tracer.log(this._tracePrefix + "*** Failure during persister close [" + e1.toString() + "] ***", Tracer.Level.WARNING);
            }
            return false;
        }
    }

    private final long materializeTransactionLog() throws Exception {
        this._tracer.log(this._tracePrefix + "Transaction log does not exist. Materializing from database...", Tracer.Level.INFO);
        if (this.openTransactionLog(OpenTransactionLogFor.LogMaterialization)) {
            try {
                LogMaterializer materializer = new LogMaterializer();
                AepFlow flow = AepFlowFactory.createAepFlow();
                materializer.onRead((IRogNode)flow);
                AepInboundSnoTable table = AepFlowFactory.createAepInboundSnoTable();
                table.setParentId(flow.getId());
                materializer.onRead((IRogNode)table);
                this._dbPersister.read(flow.getId(), materializer);
                this._tracer.log(this._tracePrefix + "Materialization complete.", Tracer.Level.INFO);
                materializer.stats();
                this._log.sync();
                this._log.close();
                return materializer._transactionId - 1L;
            }
            catch (Exception e) {
                this._log.getLogFile().delete();
                this._log.getMetadataFile().delete();
                throw e;
            }
        }
        throw new Exception("*** [Internal Error] Materialization invoked with a transaction log present! ***");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private final void openCore() throws Exception {
        this._tracer.log(this._tracePrefix + "Opening...", Tracer.Level.INFO);
        try {
            this._openingState = OpeningState.OpeningDbConnection;
            if (this._state == State.Stopping) {
                throw new Exception("open stopped");
            }
            this._dbPersister.open();
            this._openingState = OpeningState.CheckingLogPresence;
            if (this._state == State.Stopping) {
                throw new Exception("open stopped");
            }
            if (!this.openTransactionLog(OpenTransactionLogFor.LogPresenceCheck)) {
                this._openingState = OpeningState.MaterializingLog;
                if (this._state == State.Stopping) {
                    throw new Exception("open stopped");
                }
                long lastCheckpointVersion = this.materializeTransactionLog();
                this._openingState = OpeningState.OpeningTransactionLog;
                if (this._state == State.Stopping) {
                    throw new Exception("open stopped");
                }
                if (this.openTransactionLog(OpenTransactionLogFor.Cdc)) {
                    this._log.getMetadata().setCdcCheckpointVersion(lastCheckpointVersion);
                } else {
                    throw new Exception("*** [Internal Error] Transaction log absent even after materialization! ***");
                }
            }
            this._openingState = OpeningState.OpeningCdcProcessor;
            if (this._state == State.Stopping) {
                throw new Exception("open stopped");
            }
            this._cdcHandler = new CDCHandler();
            this._cdcProcessor = this._log.createCdcProcessor((IRogChangeDataCaptureHandler)this._cdcHandler);
            if (this._heartbeatExecutor.isShutdown()) {
                this._heartbeatExecutor = this.createHeartbeatExecutor();
            }
            this._heartbeatExecutor.scheduleWithFixedDelay(new HeartbeatExecutor(), 1L, 1L, TimeUnit.SECONDS);
            Class<State> lastCheckpointVersion = State.class;
            synchronized (State.class) {
                if (this._state == State.Stopping) {
                    throw new Exception("open stopped");
                }
                this._state = State.Opened;
                this._tracer.log(this._tracePrefix + "Successfully opened.", Tracer.Level.INFO);
                // ** MonitorExit[lastCheckpointVersion] (shouldn't be in output)
            }
        }
        catch (Exception e) {
            StringBuilder sb = new StringBuilder();
            sb.append("Open failed [" + e.toString() + "]\n");
            sb.append("Stack trace:\n");
            sb.append(UtlThrowable.prepareStackTrace((Throwable)e));
            this._tracer.log(sb.toString(), Tracer.Level.SEVERE);
            throw e;
        }
        {
            return;
        }
    }

    public final State state() {
        return this._state;
    }

    public final OpeningState openingState() {
        return this._openingState;
    }

    public final void open(boolean async) throws Exception {
        if (this._state == State.Init || this._state == State.Closed) {
            this._state = State.Opening;
            if (this._enabled && async) {
                new Thread(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     * Enabled force condition propagation
                     * Lifted jumps to return sites
                     */
                    @Override
                    public final void run() {
                        try {
                            Runner.this.openCore();
                            return;
                        }
                        catch (Throwable e) {
                            Class<Thread.State> clazz = Thread.State.class;
                            synchronized (Thread.State.class) {
                                Runner.this._state = Runner.this._state == State.Stopping ? State.Stopped : State.OpenFailed;
                                // ** MonitorExit[var2_2] (shouldn't be in output)
                                return;
                            }
                        }
                    }
                }.start();
            } else if (this._enabled) {
                this.openCore();
            } else {
                this._state = State.Opened;
            }
        } else {
            throw this.prepareStateValidationErrorException();
        }
    }

    public final void start() {
        this._tracer.log(this._tracePrefix + "Starting (state='" + (Object)((Object)this._state) + "')...", Tracer.Level.INFO);
        switch (this._state) {
            case Init: 
            case Opening: 
            case OpenFailed: 
            case Stopping: 
            case Closed: {
                throw this.prepareStateValidationErrorException();
            }
            case Opened: 
            case Stopped: {
                this._thread = new Thread(this);
                this._thread.setName("X-Eagle-CDCRunner-" + this._logName);
                this._thread.start();
                this._state = State.Started;
                break;
            }
            case Started: {
                this._tracer.log(this._tracePrefix + "Already started.", Tracer.Level.INFO);
                break;
            }
            default: {
                throw new IllegalStateException("unknown state '" + (Object)((Object)this._state) + "'");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public final void stop(boolean async) {
        this._tracer.log(this._tracePrefix + "Stopping (state='" + (Object)((Object)this._state) + "', async=" + async + ")...", Tracer.Level.INFO);
        switch (this._state) {
            case Init: {
                this._state = State.Stopped;
                this._tracer.log(this._tracePrefix + "Stopped successfully.", Tracer.Level.INFO);
                return;
            }
            case Opening: {
                Class<State> clazz = State.class;
                synchronized (State.class) {
                    switch (this._state) {
                        case Opening: {
                            this._state = State.Stopping;
                            break;
                        }
                        case Opened: {
                            this._tracer.log(this._tracePrefix + "Stopped successfully.", Tracer.Level.INFO);
                            this._state = State.Stopped;
                            break;
                        }
                    }
                    // ** MonitorExit[var2_2] (shouldn't be in output)
                    return;
                }
            }
            case OpenFailed: {
                this._state = State.Stopped;
                this._tracer.log(this._tracePrefix + "Stopped successfully.", Tracer.Level.INFO);
                return;
            }
            case Opened: {
                this._state = State.Stopped;
                this._tracer.log(this._tracePrefix + "Stopped successfully.", Tracer.Level.INFO);
                return;
            }
            case Started: {
                Class<State> clazz = State.class;
                synchronized (State.class) {
                    switch (this._state) {
                        case Started: {
                            this._state = State.Stopping;
                            this._cdcProcessor.close();
                            break;
                        }
                        case Stopping: {
                            this._tracer.log(this._tracePrefix + "Already stopping.", Tracer.Level.INFO);
                            break;
                        }
                    }
                    // ** MonitorExit[var2_3] (shouldn't be in output)
                    if (Thread.currentThread() == this._thread) return;
                    this._thread.interrupt();
                    if (async) return;
                    try {
                        this._thread.join();
                        return;
                    }
                    catch (InterruptedException e) {
                        this._tracer.log(this._tracePrefix + "*** The synchronous stopping of the CDC processor thread was interrupted ***", Tracer.Level.WARNING);
                    }
                    return;
                }
            }
            case Stopping: {
                this._tracer.log(this._tracePrefix + "Already stopping.", Tracer.Level.INFO);
                return;
            }
            case Stopped: {
                this._tracer.log(this._tracePrefix + "Already stopped.", Tracer.Level.INFO);
                return;
            }
            case Closed: {
                this._tracer.log(this._tracePrefix + "Already closed.", Tracer.Level.INFO);
                return;
            }
            default: {
                throw new IllegalStateException("unknown state '" + (Object)((Object)this._state) + "'");
            }
        }
    }

    public final void enable() {
        this._tracer.log(this._tracePrefix + "Enabling (state='" + (Object)((Object)this._state) + "')...", Tracer.Level.INFO);
        switch (this._state) {
            case Init: 
            case Opening: 
            case OpenFailed: 
            case Started: 
            case Stopping: 
            case Closed: {
                throw this.prepareStateValidationErrorException();
            }
            case Opened: 
            case Stopped: {
                this._log.getMetadata().enableCdc();
                break;
            }
            default: {
                throw new IllegalStateException("unknown state '" + (Object)((Object)this._state) + "'");
            }
        }
    }

    public final void disable() {
        this._tracer.log(this._tracePrefix + "Disabling (state='" + (Object)((Object)this._state) + "')...", Tracer.Level.INFO);
        switch (this._state) {
            case Init: 
            case Opening: 
            case OpenFailed: 
            case Started: 
            case Stopping: 
            case Closed: {
                throw this.prepareStateValidationErrorException();
            }
            case Opened: 
            case Stopped: {
                this._log.getMetadata().disableCdc();
                break;
            }
            default: {
                throw new IllegalStateException("unknown state '" + (Object)((Object)this._state) + "'");
            }
        }
    }

    public final void info() {
        StringBuilder sb = new StringBuilder();
        sb.append("\n");
        sb.append("Transaction Log Metadata {\n");
        RogLogMetadata metadata = this._log.getMetadata();
        sb.append("....Log Id................").append(metadata.getLogUUID().toString()).append("\n");
        sb.append("....Live Log Number.......").append(metadata.getLiveLogNumber()).append("\n");
        sb.append("....CDC...................").append(metadata.isCdcEnabled() ? "enabled" : "disabled").append("\n");
        sb.append("......Log Number..........").append(metadata.getCdcLogNumber()).append("\n");
        sb.append("......Cursor..............").append(metadata.getCdcCursor()).append("\n");
        sb.append("......Checkpoint Version..").append(metadata.getCdcCheckpointVersion()).append("\n");
        sb.append("}\n");
        this._tracer.log(this._tracePrefix + sb.toString(), Tracer.Level.INFO);
    }

    public final void close() {
        this._tracer.log(this._tracePrefix + "Closing (state='" + (Object)((Object)this._state) + "')...", Tracer.Level.INFO);
        switch (this._state) {
            case OpenFailed: {
                this._state = State.Closed;
                break;
            }
            case Stopped: {
                if (this._enabled) {
                    this._heartbeatExecutor.shutdown();
                    this._dbPersister.close();
                }
                this._state = State.Closed;
                break;
            }
            case Closed: {
                break;
            }
            default: {
                throw this.prepareStateValidationErrorException();
            }
        }
        this._tracer.log(this._tracePrefix + "Closed successfully.", Tracer.Level.INFO);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     * Converted monitor instructions to comments
     * Lifted jumps to return sites
     */
    @Override
    public final void run() {
        if (!this._enabled) {
            this._tracer.log(this._tracePrefix + "Runner not enabled.", Tracer.Level.INFO);
            this._state = State.Stopped;
            return;
        }
        this._cdcRestartedCount = 0;
        while (true) {
            StringBuilder sb;
            Class<State> clazz;
            try {
                this._stopCause = null;
                try {
                    this._tracer.log(this._tracePrefix + "Runner has started.", Tracer.Level.INFO);
                    this._cdcHandler.onStart();
                    this._cdcProcessor.run();
                    clazz = State.class;
                    // MONITORENTER : com.neeve.service.cdc.Runner$State.class
                    this._state = State.Stopping;
                    // MONITOREXIT : clazz
                    if (this._cdcHandler.getErrorCause() != null) {
                        throw this._cdcHandler.getErrorCause();
                    }
                }
                catch (Throwable e) {
                    if (!(e instanceof InterruptedException)) {
                        this._stopCause = e;
                        sb = new StringBuilder();
                        sb.append("Runner failed [" + e.toString() + "].\n");
                        sb.append("Stack trace:\n");
                        sb.append(UtlThrowable.prepareStackTrace((Throwable)e));
                        this._tracer.log(sb.toString(), Tracer.Level.SEVERE);
                    }
                }
            }
            finally {
                this._state = State.Stopped;
                this._tracer.log(this._tracePrefix + "Runner has stopped.", Tracer.Level.INFO);
                try {
                    this._cdcProcessor.close();
                    this._cdcHandler = new CDCHandler();
                    this._cdcProcessor = this._log.createCdcProcessor((IRogChangeDataCaptureHandler)this._cdcHandler);
                }
                catch (Throwable e) {
                    sb = new StringBuilder();
                    sb.append("Failed to create new instance of CDC processor [" + e.toString() + "].\n");
                    sb.append("Stack trace:\n");
                    sb.append(UtlThrowable.prepareStackTrace((Throwable)e));
                    this._tracer.log(sb.toString(), Tracer.Level.SEVERE);
                }
            }
            if (this._stopCause == null) return;
            if (!(this._stopCause instanceof NullPointerException)) {
                if (!(this._stopCause instanceof EPktLogCorruptException)) return;
            }
            if (this._cdcRestartedCount >= this._cdcRestartAttempts) return;
            this._tracer.log(this._tracePrefix + "Runner will be restarted in " + this._cdcRestartFrequency + " millis.", Tracer.Level.INFO);
            try {
                Thread.sleep(this._cdcRestartFrequency);
            }
            catch (InterruptedException ex) {
                this._tracer.log("Cdc restart sleep interrupted!", Tracer.Level.WARNING);
            }
            this._tracer.log(this._tracePrefix + "Restarting Runner, attempt# " + ++this._cdcRestartedCount, Tracer.Level.INFO);
            clazz = State.class;
            // MONITORENTER : com.neeve.service.cdc.Runner$State.class
            if (this._state != State.Stopped) {
                this._tracer.log(this._tracePrefix + "State changed to " + (Object)((Object)this._state) + " before restart could be done", Tracer.Level.WARNING);
                // MONITOREXIT : clazz
                return;
            }
            this._state = State.Started;
            // MONITOREXIT : clazz
        }
    }

    public static enum OpeningState {
        OpeningDbConnection,
        CheckingLogPresence,
        MaterializingLog,
        OpeningTransactionLog,
        OpeningCdcProcessor;

    }

    public static enum State {
        Init,
        Opening,
        OpenFailed,
        Opened,
        Started,
        Stopping,
        Stopped,
        Closed;

    }

    private final class CDCHandler
    implements IRogChangeDataCaptureHandler {
        private long _currentCDCVersion;
        private IRogChangeDataCaptureHandler.LogCompletionReason logCompletionReason;
        private Throwable errorCause;

        final void onStart() throws Exception {
        }

        final IRogChangeDataCaptureHandler.LogCompletionReason getLogCompletionReason() {
            return this.logCompletionReason;
        }

        final Throwable getErrorCause() {
            return this.errorCause;
        }

        public final void onLogStart(int logNumber) {
            Runner.this._tracer.log(Runner.this._tracePrefix + " Starting CDC on log #" + logNumber, Tracer.Level.INFO);
        }

        public final void onCheckpointStart(long version) {
            this._currentCDCVersion = version;
            Runner.this._tracer.log(Runner.this._tracePrefix + " Starting CDC for checkpoint version=" + version, Tracer.Level.INFO);
        }

        public final boolean handleChange(UUID id, IRogChangeDataCaptureHandler.ChangeType ct, List<IRogNode> list) {
            IRogNode node = list.get(list.size() - 1);
            switch (ct) {
                case Put: 
                case Update: 
                case Send: {
                    Runner.this._tracer.log(Runner.this._tracePrefix + " Received change [object=" + node.getClass().getSimpleName() + ", type=" + ct + ", id=" + id.toString() + "]", Tracer.Level.INFO);
                    try {
                        Runner.this._dbPersister.update(node);
                        return true;
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
                case Remove: {
                    Runner.this._tracer.log(Runner.this._tracePrefix + " Received change [type=" + ct + ", id=" + id.toString() + "]", Tracer.Level.INFO);
                    try {
                        Runner.this._dbPersister.delete(id);
                        return true;
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            }
            Runner.this._tracer.log(Runner.this._tracePrefix + " Received unexpected change [type=" + ct + ", id=" + id.toString() + "]", Tracer.Level.INFO);
            return true;
        }

        public final boolean onCheckpointComplete(long version) {
            Runner.this._tracer.log(Runner.this._tracePrefix + " Completed CDC for checkpoint version=" + version, Tracer.Level.INFO);
            Runner.this._cdcRestartedCount = 0;
            return true;
        }

        public final void onWait() {
        }

        public final void onLogComplete(int logNumber, IRogChangeDataCaptureHandler.LogCompletionReason reason, Throwable errorCause) {
            this.logCompletionReason = reason;
            this.errorCause = errorCause;
            Runner.this._tracer.log(Runner.this._tracePrefix + " Ended CDC on log #" + logNumber + " (reason=" + (reason == IRogChangeDataCaptureHandler.LogCompletionReason.Error ? (errorCause instanceof InterruptedException ? "interrupted" : reason.toString()) : reason.toString()) + ")", Tracer.Level.INFO);
        }
    }

    private final class LogMaterializer
    implements IDbReaderCallback {
        private final Map<Class<?>, Integer> _countsByType;
        private final StoreCommitEntry commitEntry = StoreCommitEntry.create();
        private int _count = 0;
        private long _transactionId = 2L;

        LogMaterializer() {
            this._countsByType = new LinkedHashMap();
        }

        final void stats() {
            Runner.this._tracer.log(Runner.this._tracePrefix + "Total objects materialized = " + this._count, Tracer.Level.INFO);
            for (Class<?> clazz : this._countsByType.keySet()) {
                Runner.this._tracer.log(Runner.this._tracePrefix + "..." + clazz.getSimpleName() + "=" + this._countsByType.get(clazz), Tracer.Level.INFO);
            }
        }

        @Override
        public final boolean abort() {
            return Runner.this._state == State.Stopping;
        }

        @Override
        public final void onRead(IRogNode node) {
            Runner.this._log.writeCommitEntry(this.commitEntry.init(IStoreBinding.Operation.Put, node.getId(), node.getOfid(), node.getType(), this._transactionId, this._transactionId, this._transactionId, (IStoreObject)node, node.serialize(), node.getContentEncodingType(), true, true), false, false);
            Integer count = this._countsByType.get(node.getClass());
            if (count == null) {
                this._countsByType.put(node.getClass(), 1);
            } else {
                this._countsByType.put(node.getClass(), count + 1);
            }
            ++this._count;
            ++this._transactionId;
        }
    }

    private static enum OpenTransactionLogFor {
        Cdc,
        LogMaterialization,
        LogPresenceCheck;

    }

    private final class HeartbeatExecutor
    implements Runnable {
        private HeartbeatExecutor() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public final void run() {
            RogLogMetadata metadata;
            try {
                Runner.this._dbPersister.write(Point.measurement((String)"agent_cdc_heartbeat").time(System.currentTimeMillis(), TimeUnit.MILLISECONDS).tag("agent_name", Runner.this._dbPersister.escapeSpaces(Runner.this._dbPersister.getAgentName())).tag("vm_name", Runner.this._dbPersister.escapeSpaces(Runner.this._dbPersister.getServerName())).tag("cdc_module", Runner.this._dbPersister.getModule() == null ? "heartbeats_unknown" : Runner.this._dbPersister.getModule()).addField("value", (double)(Runner.this._state == State.Started ? 1 : -1)).build());
            }
            catch (Throwable e) {
                RogLogMetadata metadata2;
                try {
                    StringBuilder sb = new StringBuilder();
                    sb.append("Failed to send CDC heartbeat (module=" + Runner.this._dbPersister.getModule() + ") [" + e.toString() + "]\n");
                    sb.append("Stack trace:\n");
                    sb.append(UtlThrowable.prepareStackTrace((Throwable)e));
                    Runner.this._tracer.log(sb.toString(), Tracer.Level.SEVERE);
                }
                catch (Throwable throwable) {
                    RogLogMetadata metadata3;
                    State state = Runner.this._state;
                    Throwable stopCause = Runner.this._stopCause;
                    if (state == State.Stopped && stopCause != null) {
                        StringBuilder sb = new StringBuilder();
                        sb.append("[CDC Runner (" + Runner.this._logName + ")] has stopped\n");
                        sb.append("Cause:\n");
                        sb.append(UtlThrowable.prepareStackTrace((Throwable)Runner.this._stopCause));
                        Runner.this._alerter.alert(sb.toString(), null, Alerter.Severity.Warning, "CDC");
                    }
                    if ((metadata3 = Runner.this._log.getMetadata()).getLiveLogNumber() - metadata3.getCdcLogNumber() >= 4) {
                        StringBuilder sb = new StringBuilder();
                        sb.append("[CDC Runner (" + Runner.this._logName + ")] CDC log number is falling behind live log number {\n");
                        sb.append("....Log Id................").append(metadata3.getLogUUID().toString()).append("\n");
                        sb.append("....Live Log Number.......").append(metadata3.getLiveLogNumber()).append("\n");
                        sb.append("....CDC...................").append(metadata3.isCdcEnabled() ? "enabled" : "disabled").append("\n");
                        sb.append("......Log Number..........").append(metadata3.getCdcLogNumber()).append("\n");
                        sb.append("......Cursor..............").append(metadata3.getCdcCursor()).append("\n");
                        sb.append("......Checkpoint Version..").append(metadata3.getCdcCheckpointVersion()).append("\n");
                        sb.append("}\n");
                        Runner.this._alerter.alert(sb.toString(), null, Alerter.Severity.Warning, "CDC");
                    }
                    throw throwable;
                }
                State state = Runner.this._state;
                Throwable stopCause = Runner.this._stopCause;
                if (state == State.Stopped && stopCause != null) {
                    StringBuilder sb = new StringBuilder();
                    sb.append("[CDC Runner (" + Runner.this._logName + ")] has stopped\n");
                    sb.append("Cause:\n");
                    sb.append(UtlThrowable.prepareStackTrace((Throwable)Runner.this._stopCause));
                    Runner.this._alerter.alert(sb.toString(), null, Alerter.Severity.Warning, "CDC");
                }
                if ((metadata2 = Runner.this._log.getMetadata()).getLiveLogNumber() - metadata2.getCdcLogNumber() >= 4) {
                    StringBuilder sb = new StringBuilder();
                    sb.append("[CDC Runner (" + Runner.this._logName + ")] CDC log number is falling behind live log number {\n");
                    sb.append("....Log Id................").append(metadata2.getLogUUID().toString()).append("\n");
                    sb.append("....Live Log Number.......").append(metadata2.getLiveLogNumber()).append("\n");
                    sb.append("....CDC...................").append(metadata2.isCdcEnabled() ? "enabled" : "disabled").append("\n");
                    sb.append("......Log Number..........").append(metadata2.getCdcLogNumber()).append("\n");
                    sb.append("......Cursor..............").append(metadata2.getCdcCursor()).append("\n");
                    sb.append("......Checkpoint Version..").append(metadata2.getCdcCheckpointVersion()).append("\n");
                    sb.append("}\n");
                    Runner.this._alerter.alert(sb.toString(), null, Alerter.Severity.Warning, "CDC");
                }
            }
            State state = Runner.this._state;
            Throwable stopCause = Runner.this._stopCause;
            if (state == State.Stopped && stopCause != null) {
                StringBuilder sb = new StringBuilder();
                sb.append("[CDC Runner (" + Runner.this._logName + ")] has stopped\n");
                sb.append("Cause:\n");
                sb.append(UtlThrowable.prepareStackTrace((Throwable)Runner.this._stopCause));
                Runner.this._alerter.alert(sb.toString(), null, Alerter.Severity.Warning, "CDC");
            }
            if ((metadata = Runner.this._log.getMetadata()).getLiveLogNumber() - metadata.getCdcLogNumber() >= 4) {
                StringBuilder sb = new StringBuilder();
                sb.append("[CDC Runner (" + Runner.this._logName + ")] CDC log number is falling behind live log number {\n");
                sb.append("....Log Id................").append(metadata.getLogUUID().toString()).append("\n");
                sb.append("....Live Log Number.......").append(metadata.getLiveLogNumber()).append("\n");
                sb.append("....CDC...................").append(metadata.isCdcEnabled() ? "enabled" : "disabled").append("\n");
                sb.append("......Log Number..........").append(metadata.getCdcLogNumber()).append("\n");
                sb.append("......Cursor..............").append(metadata.getCdcCursor()).append("\n");
                sb.append("......Checkpoint Version..").append(metadata.getCdcCheckpointVersion()).append("\n");
                sb.append("}\n");
                Runner.this._alerter.alert(sb.toString(), null, Alerter.Severity.Warning, "CDC");
            }
        }
    }

    private final class HeartbeatExecutorThreadFactory
    implements ThreadFactory {
        private int num;

        private HeartbeatExecutorThreadFactory() {
        }

        @Override
        public final Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("X-Eagle-CDC-HeartbeatExecutor-" + ++this.num);
            return thread;
        }
    }
}

