/*
 * Decompiled with CFR 0.152.
 */
package com.neeve.toa.opt.impl;

import com.neeve.aep.AepEngine;
import com.neeve.aep.AepEngineDescriptor;
import com.neeve.aep.annotations.EventHandler;
import com.neeve.aep.event.AepChannelDownEvent;
import com.neeve.aep.event.AepChannelUpEvent;
import com.neeve.rog.IRogMessage;
import com.neeve.server.app.annotations.AppStat;
import com.neeve.sma.MessageBusDescriptor;
import com.neeve.sma.MessageChannel;
import com.neeve.sma.MessageChannelDescriptor;
import com.neeve.sma.MessageView;
import com.neeve.sma.SmaException;
import com.neeve.sma.spi.executor.ExecutorBusProcessor;
import com.neeve.toa.messages.DelayedAckMessage;
import com.neeve.toa.messages.HornetMessageFactory;
import com.neeve.toa.opt.DelayedAcknowledgmentController;
import com.neeve.toa.opt.impl.DelayedAckControllerProcessorFactory;
import com.neeve.trace.Tracer;
import com.neeve.util.UtlPool;
import com.neeve.util.UtlReferenceTracker;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class DelayedAckControllerImpl
implements DelayedAcknowledgmentController {
    private static final Tracer tracer = Tracer.get((String)"nv.toa.delayedack");
    private final UtlPool.Factory<DelayedAcknowledgerImpl> FACTORY = new UtlPool.Factory<DelayedAcknowledgerImpl>(){

        public DelayedAcknowledgerImpl createItem(Object object) {
            return new DelayedAcknowledgerImpl();
        }

        public DelayedAcknowledgerImpl[] createItemArray(int size) {
            return new DelayedAcknowledgerImpl[size];
        }
    };
    private static final String delayedAckExecutorChannelName = "delayed-ack";
    private volatile State state = State.Init;
    private volatile AepEngineDescriptor engineDescriptor;
    private volatile AepEngine engine;
    private String delayedAckExecutorBusName;
    private UtlPool<DelayedAcknowledgerImpl> delayedAcknowledgePool;
    private volatile MessageChannel delayedAckChannel;
    private AtomicLong delayedAcksPendingCount = new AtomicLong(0L);

    public void initEngineDescriptor(AepEngineDescriptor engineDescriptor) throws SmaException {
        tracer.log("Initializing delayed acknowledgment controller for " + engineDescriptor.getName(), Tracer.Level.INFO);
        this.engineDescriptor = engineDescriptor;
        this.delayedAckExecutorBusName = "hornet-delayed-executor-" + engineDescriptor.getName();
        String delayedAckExecutorBusName = "hornet-delayed-executor-" + engineDescriptor.getName();
        MessageBusDescriptor delayedAckBusDescriptor = MessageBusDescriptor.create((String)delayedAckExecutorBusName);
        delayedAckBusDescriptor.setProviderConfig("executor://" + delayedAckExecutorBusName);
        delayedAckBusDescriptor.setProviderConfigProperty("processor_factory_classname", DelayedAckControllerProcessorFactory.class.getName());
        MessageChannelDescriptor delayedAckChannelDesecriptor = MessageChannelDescriptor.create((String)delayedAckExecutorChannelName, (MessageBusDescriptor)delayedAckBusDescriptor);
        delayedAckChannelDesecriptor.setChannelQos(MessageChannel.Qos.Guaranteed);
        delayedAckBusDescriptor.addChannel(delayedAckChannelDesecriptor);
        delayedAckBusDescriptor.save();
        engineDescriptor.addBus(delayedAckExecutorBusName);
        engineDescriptor.setBusManagerProperty(delayedAckExecutorBusName, "detachedCommit", "false");
        engineDescriptor.addChannel(delayedAckExecutorBusName, delayedAckExecutorChannelName, AepEngineDescriptor.ChannelConfig.from((String)"join=false"));
        this.delayedAcknowledgePool = UtlPool.create((String)"hornet-delayed-ack", (String)engineDescriptor.getName(), this.FACTORY, (UtlPool.Params)UtlPool.Params.create().setThreaded(true));
    }

    public void initEngine(AepEngine engine) {
        this.engine = engine;
        this.engine.registerFactory((Object)HornetMessageFactory.create(null));
        this.state = State.Started;
    }

    @EventHandler
    public void onChannelUp(AepChannelUpEvent event) {
        if (delayedAckExecutorChannelName.equals(event.getMessageChannel().getName()) && event.getMessageBusBinding().getName().equals(this.delayedAckExecutorBusName)) {
            this.delayedAckChannel = event.getMessageChannel();
        }
    }

    @EventHandler
    public void onChannelDown(AepChannelDownEvent event) {
        if (delayedAckExecutorChannelName.equals(event.getMessageChannel().getName()) && event.getMessageBusBinding().getName().equals(this.delayedAckExecutorBusName)) {
            this.delayedAckChannel = null;
        }
    }

    public void close() {
        if (this.engineDescriptor != null) {
            tracer.log("Closing delayed acknowledgment controller for " + this.engineDescriptor.getName(), Tracer.Level.INFO);
            this.engine = null;
            this.engineDescriptor = null;
            this.delayedAcknowledgePool.close();
            this.state = State.Closed;
        }
    }

    @AppStat(name="hornet.delayedAcksPending")
    public long getDelayedAcksPending() {
        return this.delayedAcksPendingCount.get();
    }

    @Override
    public final DelayedAcknowledgmentController.DelayedAcknowledger delayAcknowledgment() {
        if (this.state != State.Started) {
            throw new IllegalStateException("Delayed acknowledgment controller is not Started (" + (Object)((Object)this.state) + ")");
        }
        if (!this.engine.isMessageDispatchThread()) {
            throw new IllegalStateException("createDelayedAck cannot be called from outside of a message handler!");
        }
        if (this.engine.getApplicationState((MessageView)this.engine.getCurrentMessage()) != null) {
            throw new UnsupportedOperationException("Delayed acknowledgment is not supported for state replication engines with a state repository");
        }
        if (this.engine.isPrimary() && this.engine.getState() == AepEngine.State.Started) {
            MessageChannel delayedAckChannel = this.delayedAckChannel;
            if (delayedAckChannel == null) {
                throw new IllegalStateException("delayedAckChannel is not up!");
            }
            DelayedAcknowledgerImpl ack = ((DelayedAcknowledgerImpl)this.delayedAcknowledgePool.get(null)).initAckCount();
            DelayedAckMessage message = HornetMessageFactory.createDelayedAckMessage();
            message.setAttachment(ack);
            message.setMessageChannelAsRaw(delayedAckChannel.getNameAsRaw());
            message.setMessageBusAsRaw(delayedAckChannel.getNameAsRaw());
            this.engine.sendMessage(delayedAckChannel, (IRogMessage)message);
            this.delayedAcksPendingCount.incrementAndGet();
            if (DelayedAckControllerImpl.tracer.debug) {
                tracer.log("Created delayed acknowledger for " + this.engine.getName(), Tracer.Level.DEBUG);
            }
            return ack;
        }
        throw new IllegalStateException("createDelayedAck can only be called from a primary engine in which messaging is started!");
    }

    static final class DelayedAckProcessor
    implements ExecutorBusProcessor {
        DelayedAckProcessor() {
        }

        public void process(MessageView view, ExecutorBusProcessor.Acknowledger acknowledger, int flags) throws Exception {
            if (view instanceof IRogMessage && ((IRogMessage)view).getAttachment() instanceof DelayedAcknowledgerImpl) {
                DelayedAcknowledgerImpl delayedAck = (DelayedAcknowledgerImpl)((IRogMessage)view).getAttachment();
                delayedAck.setBusAcknowledger(acknowledger);
            }
        }
    }

    private class DelayedAcknowledgerImpl
    implements UtlPool.Item<DelayedAcknowledgerImpl>,
    DelayedAcknowledgmentController.DelayedAcknowledger {
        private final UtlReferenceTracker refTracker;
        private UtlPool<DelayedAcknowledgerImpl> pool;
        private volatile Exception status;
        private volatile ExecutorBusProcessor.Acknowledger busAcknowledger;
        private AtomicInteger ackCountDown = new AtomicInteger(0);

        DelayedAcknowledgerImpl() {
            this.refTracker = UtlReferenceTracker.enabled(this.getClass()) ? new UtlReferenceTracker((Object)this) : null;
            this.init();
        }

        public DelayedAcknowledgerImpl init() {
            int count = this.ackCountDown.getAndSet(0);
            if (count != 0) {
                if (UtlReferenceTracker.TYPE_TRACKING_ENABLED && this.refTracker != null) {
                    this.refTracker.onInit(count);
                }
                throw new IllegalStateException("DelayedAcknowledger initialized with non 0 ack count (" + count + ")");
            }
            this.status = null;
            this.busAcknowledger = null;
            return this;
        }

        public DelayedAcknowledgerImpl initAckCount() {
            if (!this.ackCountDown.compareAndSet(0, 2)) {
                if (UtlReferenceTracker.TYPE_TRACKING_ENABLED && this.refTracker != null) {
                    this.refTracker.dump();
                }
                throw new IllegalStateException("Attempt to initialized DelayedAcknowledger with non zero ack count " + this.ackCountDown.get());
            }
            return this;
        }

        public final DelayedAcknowledgerImpl setPool(UtlPool<DelayedAcknowledgerImpl> pool) {
            this.pool = pool;
            return this;
        }

        public final UtlPool<DelayedAcknowledgerImpl> getPool() {
            return this.pool;
        }

        final void setBusAcknowledger(ExecutorBusProcessor.Acknowledger acknowledger) {
            if (tracer.debug) {
                tracer.log("Delayed acknowledgment bus processing completed for " + DelayedAckControllerImpl.this.engineDescriptor.getName(), Tracer.Level.DEBUG);
            }
            this.busAcknowledger = acknowledger;
            this.acknowledge(null);
        }

        @Override
        public final void acknowledge() {
            this.acknowledge(null);
        }

        @Override
        public final void acknowledge(Exception status) {
            if (status != null && this.status == null) {
                this.status = status;
            }
            int count = this.ackCountDown.decrementAndGet();
            if (UtlReferenceTracker.TYPE_TRACKING_ENABLED && this.refTracker != null) {
                this.refTracker.onDispose(count);
            }
            if (count < 0) {
                throw new IllegalStateException("Attempt to acknowledge an already acknowledged delayed acknowledgment!");
            }
            if (count == 0) {
                if (tracer.debug) {
                    tracer.log("Releasing delayed acknowledgment for " + DelayedAckControllerImpl.this.engineDescriptor.getName(), Tracer.Level.DEBUG);
                }
                this.busAcknowledger.acknowledge(status);
                DelayedAckControllerImpl.this.delayedAcksPendingCount.decrementAndGet();
                if (DelayedAckControllerImpl.this.state != State.Closed) {
                    this.pool.put((UtlPool.Item)this);
                }
            }
        }
    }

    private static enum State {
        Init,
        Started,
        Closed;

    }
}

