/*
 * Decompiled with CFR 0.152.
 */
package com.neeve.perf.link;

import com.neeve.emx.EmxFactory;
import com.neeve.emx.IEmxDispatcher;
import com.neeve.link.ILnkClientEndpoint;
import com.neeve.link.ILnkEndpoint;
import com.neeve.link.ILnkEventHandler;
import com.neeve.link.ILnkPeerEndpoint;
import com.neeve.link.ILnkSTRRootEndpoint;
import com.neeve.link.LnkEvents;
import com.neeve.link.LnkFactory;
import com.neeve.perf.common.LatencyWriter;
import com.neeve.perf.common.SystemProperties;
import com.neeve.pkt.PktFactory;
import com.neeve.pkt.PktPacket;
import com.neeve.pkt.types.PktBodyData;
import com.neeve.tools.interactive.commands.AnnotatedCommand;
import com.neeve.util.UtlGovernor;
import com.neeve.util.UtlThread;
import java.text.DecimalFormat;

@AnnotatedCommand.Command(keywords={"StreamingLinkSender"}, description="A sender for testing ping performance")
public final class PingPongSender
extends AnnotatedCommand {
    @AnnotatedCommand.Option(shortForm=100, longForm="descriptor", required=true, description="The connection descriptor to use e.g. tcp://192.168.1.7:12000&tcpnodelay=true")
    private String _descriptor;
    @AnnotatedCommand.Option(shortForm=109, longForm="messageSize", defaultValue="256", required=true, description="The size of the message to stream")
    private int _messageSize;
    @AnnotatedCommand.Option(shortForm=116, longForm="testCount", defaultValue="300000", description="The test count")
    private int _testCount;
    @AnnotatedCommand.Option(shortForm=114, longForm="testRate", defaultValue="10000", description="The send rate")
    private int _testRate;
    @AnnotatedCommand.Option(shortForm=119, longForm="warmupTime", defaultValue="2", description="The warm up time, in seconds")
    private int _warmupTime;
    @AnnotatedCommand.Option(shortForm=97, longForm="writerCpuAffinityMask", description="which CPU(s) to affinitize the writer thread to")
    private String _writerCpuAffinityMask;
    @AnnotatedCommand.Option(shortForm=98, longForm="readerCpuAffinityMask", description="which CPU(s) to affinitize the reader thread to")
    private String _readerCpuAffinityMask;
    @AnnotatedCommand.Option(shortForm=115, longForm="stats", defaultValue="false", description="Whether to output incremental throughput stats")
    private boolean _stats;
    @AnnotatedCommand.Option(shortForm=111, longForm="oneWayLatency", description="whether to calculate one-way latency values")
    private boolean _oneWay;
    @AnnotatedCommand.Option(shortForm=105, longForm="printIntervalStats", description="whether to output stats at periodic intervals instead of only at the end")
    private boolean _printIntervalStats;
    @AnnotatedCommand.Option(shortForm=102, longForm="dontWriteLatenciesToFile", description="whether to suppress writing latency values to a file")
    private boolean _dontWriteLatenciesToFile;

    private void stats(int numSent, int deltaNumSent, int numReceived, int deltaNumReceived, long now, long start, long deltaStart) {
        long deltaSendRate = deltaStart > 0L ? (long)deltaNumSent * 1000L / (now - deltaStart) : 0L;
        long overallSendRate = (long)numSent * 1000L / (now - start);
        long deltaReceiveRate = deltaStart > 0L ? (long)deltaNumReceived * 1000L / (now - deltaStart) : 0L;
        long overallReceiveRate = (long)numReceived * 1000L / (now - start);
        int pending = numSent - numReceived;
        System.out.println("[PingPongSender] SEND [" + numSent + "," + deltaSendRate + "," + overallSendRate + "] RECV [" + numReceived + "," + deltaReceiveRate + "," + overallReceiveRate + "] PENDING [" + pending + "]");
    }

    private void waitForRoundTripsToComplete(int numSent, Reader reader) {
        while (numSent > reader.numReceived()) {
            System.out.println("[PingPongSender] Waiting for round trips to complete (sent=" + numSent + ", numRcvd=" + reader.numReceived() + ")...");
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    private final ILnkPeerEndpoint connect(IEmxDispatcher dispatcher) throws Exception {
        ILnkClientEndpoint cep = LnkFactory.getInstance().createClientEndpoint(this._descriptor, null);
        System.out.println("[PingPongSender] Connecting to " + this._descriptor + "]...");
        ConnectCompleteEventHandler connectCompleteHandler = new ConnectCompleteEventHandler();
        cep.connectPost(dispatcher, (ILnkEventHandler)connectCompleteHandler, -1, 0);
        while (connectCompleteHandler.eventData == null) {
            dispatcher.run(-1);
        }
        if (connectCompleteHandler.eventData.status) {
            System.out.println("[PingPongSender] Connect success.");
            return connectCompleteHandler.eventData.pep;
        }
        throw connectCompleteHandler.eventData.e;
    }

    public void execute() throws Exception {
        long start;
        if (this._writerCpuAffinityMask != null) {
            System.out.println("[PingPongSender] Affinitizing writer thread to CPU " + this._writerCpuAffinityMask);
            UtlThread.setCPUAffinityMask((long)UtlThread.parseAffinityMask((String)this._writerCpuAffinityMask));
        }
        SystemProperties.dump();
        DecimalFormat dfmt = new DecimalFormat("#,###");
        System.out.println("[PingPongSender] Descriptor:" + this._descriptor);
        System.out.println("[PingPongSender] Message size:" + this._messageSize);
        System.out.println("[PingPongSender] Test count:" + dfmt.format(this._testCount));
        System.out.println("[PingPongSender] Warmup time:" + this._warmupTime + "s");
        System.out.println("[PingPongSender] Test rate:" + dfmt.format(this._testRate));
        System.out.println("[PingPongSender] Writer CPU affinity mask:" + this._writerCpuAffinityMask);
        System.out.println("[PingPongSender] Reader CPU affinity mask:" + this._readerCpuAffinityMask);
        System.out.println("[PingPongSender] One Way Latency:" + this._oneWay);
        System.out.println("[PingPongSender] Print interval stats:" + this._printIntervalStats);
        System.out.println("[PingPongSender] Write latencies to file:" + !this._dontWriteLatenciesToFile);
        IEmxDispatcher dispatcher = EmxFactory.getInstance().createDispatcher(EmxFactory.EmxImpl.DEFAULT, "StreamingReceiverDispatcher", null);
        ILnkPeerEndpoint pep = this.connect(dispatcher);
        Reader reader = new Reader(dispatcher, pep);
        reader.start();
        PktPacket packet = PktFactory.getInstance().createPacket(257);
        ((PktBodyData)packet.getBody()).setBufferLength(this._messageSize);
        LatencyWriter lw = new LatencyWriter("nw-lat", this._dontWriteLatenciesToFile ? null : "latencies.send.bin", this._printIntervalStats);
        System.out.println("[PingPongSender] Warming up (" + this._warmupTime + " seconds)...");
        UtlGovernor governor = new UtlGovernor(this._testRate);
        long warmupStart = System.currentTimeMillis();
        int numSent = 0;
        while (System.currentTimeMillis() - warmupStart < (long)this._warmupTime * 1000L) {
            packet.getBody().getBuffer().putLong(0, System.nanoTime());
            pep.enque((short)-1, packet, null, 16);
            ++numSent;
            governor.blockToNext();
        }
        this.waitForRoundTripsToComplete(numSent, reader);
        System.out.println("[PingPongSender] Running test (rate=" + dfmt.format(this._testRate) + ", count=" + dfmt.format(this._testCount) + ")...");
        numSent = 0;
        reader.reset().setLatencyWriter(lw);
        governor = new UtlGovernor(this._testRate);
        int deltaNumSent = 0;
        long deltaStart = start = System.currentTimeMillis();
        lw.start(this._testRate, this._testCount);
        for (int i = 0; i < this._testCount; ++i) {
            packet.getBody().getBuffer().putLong(0, System.nanoTime());
            pep.enque((short)-1, packet, null, 16);
            ++numSent;
            if (this._stats) {
                ++deltaNumSent;
                long now = System.currentTimeMillis();
                if (now - deltaStart >= 1000L) {
                    this.stats(numSent, deltaNumSent, reader.numReceived(), reader.deltaNumReceived(), now, start, deltaStart);
                    deltaStart = now;
                    deltaNumSent = 0;
                    reader.clearDeltaNumReceived();
                }
            }
            governor.blockToNext();
        }
        long stop = System.currentTimeMillis();
        this.waitForRoundTripsToComplete(numSent, reader);
        lw.close();
        System.out.println("[PingPongSender] Test complete.");
        if (this._stats) {
            this.stats(numSent, deltaNumSent, reader.numReceived(), reader.deltaNumReceived(), stop, start, 0L);
        }
        pep.close((short)-1);
        if (!this._dontWriteLatenciesToFile) {
            System.out.println("[PingPongSender] Test complete (run rumi-reporter on latencies.send.bin to calculate latency stats).");
        } else {
            System.out.println("[PingPongSender] Test complete.");
        }
    }

    public static void main(String[] args) throws Exception {
        System.setProperty("nv.enablecpuaffinitymasks", "true");
        new PingPongSender().run(args);
    }

    private final class Reader
    extends Thread {
        private final IEmxDispatcher _dispatcher;
        private final ILnkPeerEndpoint _pep;
        private LatencyWriter _lw;
        private int _numReceived;
        private int _deltaNumReceived;
        private boolean _done;

        Reader(IEmxDispatcher dispatcher, ILnkPeerEndpoint pep) {
            this._dispatcher = dispatcher;
            this._pep = pep;
        }

        final Reader reset() {
            this._deltaNumReceived = 0;
            this._numReceived = 0;
            return this;
        }

        final int numReceived() {
            return this._numReceived;
        }

        final int deltaNumReceived() {
            return this._deltaNumReceived;
        }

        final void clearDeltaNumReceived() {
            this._deltaNumReceived = 0;
        }

        final void setLatencyWriter(LatencyWriter lw) {
            this._lw = lw;
        }

        @Override
        public final void run() {
            try {
                this._dispatcher.setOwner();
                if (PingPongSender.this._readerCpuAffinityMask != null) {
                    System.out.println("[PingPongSender] Affinitizing reader thread to CPU " + PingPongSender.this._readerCpuAffinityMask);
                    UtlThread.setCPUAffinityMask((long)UtlThread.parseAffinityMask((String)PingPongSender.this._readerCpuAffinityMask));
                }
                this._pep.join((short)-1, (ILnkEventHandler)new EventHandler());
                System.out.println("[PingPongSender] Receiving packets...");
                ((ILnkSTRRootEndpoint)this._pep.getRootEndpoint()).startRead(this._dispatcher, 0);
                while (!this._done) {
                    this._dispatcher.run(-1);
                }
            }
            catch (Throwable e) {
                e.printStackTrace();
            }
        }

        private final class EventHandler
        implements ILnkEventHandler {
            private EventHandler() {
            }

            public final void onEvent(IEmxDispatcher dispatcher, ILnkEndpoint ep, int event, Object data) {
                switch (event) {
                    case 5: {
                        try {
                            long ts = System.nanoTime();
                            PktPacket packet = (PktPacket)data;
                            int latency = (int)((ts - packet.getBody().getBuffer().getLong(0)) / (long)(PingPongSender.this._oneWay ? 2 : 1));
                            if (Reader.this._lw != null) {
                                Reader.this._lw.write(latency);
                            }
                            packet.dispose();
                            Reader.this._numReceived++;
                            Reader.this._deltaNumReceived++;
                        }
                        catch (Throwable e) {
                            e.printStackTrace();
                            Reader.this._done = true;
                        }
                        break;
                    }
                    case 8: {
                        System.out.println("[PingPongSender] Link (" + ep.toString() + ") failure [" + ((Exception)data).toString() + "]");
                        Reader.this._done = true;
                        break;
                    }
                    default: {
                        throw new InternalError("Received event [type=" + event + " data=" + data + "]!");
                    }
                }
            }
        }
    }

    private final class ConnectCompleteEventHandler
    implements ILnkEventHandler {
        LnkEvents.ConnectAcceptCompleteEventData eventData;

        private ConnectCompleteEventHandler() {
        }

        public final void onEvent(IEmxDispatcher dispatcher, ILnkEndpoint ep, int event, Object data) {
            this.eventData = (LnkEvents.ConnectAcceptCompleteEventData)data;
        }
    }
}

