/*
 * Decompiled with CFR 0.152.
 */
package com.neeve.perf.link;

import com.neeve.emx.EmxNwLnk;
import com.neeve.emx.EmxNwLnkConnector;
import com.neeve.io.IOBuffer;
import com.neeve.perf.common.SystemProperties;
import com.neeve.stats.StatsLatencyWriter;
import com.neeve.tools.interactive.commands.AnnotatedCommand;
import com.neeve.util.UtlThread;
import com.neeve.util.UtlTime;
import java.text.DecimalFormat;

@AnnotatedCommand.Command(keywords={"BlockingStreamingSender"}, description="A blocking sender to test streaming performance using EMX links")
public class BlockingStreamingSender
extends AnnotatedCommand {
    @AnnotatedCommand.Option(shortForm=100, longForm="descriptor", required=true, description="The connection descriptor to use e.g. tcp://192.168.1.7:12000&localifaddr=192.168.1.8&localport=12000&tcpnodelay=true")
    private String _descriptor;
    @AnnotatedCommand.Option(shortForm=109, longForm="messageSize", defaultValue="256", required=true, description="The size of the message to stream")
    private int _messageSize;
    @AnnotatedCommand.Option(shortForm=98, longForm="bufferSize", defaultValue="256", required=true, description="The size of the connection's write buffer size")
    private int _bufferSize;
    @AnnotatedCommand.Option(shortForm=116, longForm="testCount", defaultValue="100000000", description="The test count")
    private int _testCount;
    @AnnotatedCommand.Option(shortForm=114, longForm="testRate", defaultValue="10000000", description="The send rate")
    private int _testRate;
    @AnnotatedCommand.Option(shortForm=119, longForm="warmupTime", defaultValue="2", description="The warm up time, in seconds")
    private int _warmupTime;
    @AnnotatedCommand.Option(shortForm=99, longForm="cpuAffinityMask", description="which CPU(s) to affinitize the sending thread to")
    private String _cpuAffinityMask;
    @AnnotatedCommand.Option(shortForm=105, longForm="printIntervalStats", description="whether to output stats at periodic intervals instead of only at the end")
    private boolean _printIntervalStats;
    @AnnotatedCommand.Option(shortForm=102, longForm="dontWriteLatenciesToFile", description="whether to suppress writing latency values to a file")
    private boolean _dontWriteLatenciesToFile;

    public void execute() throws Exception {
        if (this._cpuAffinityMask != null) {
            System.out.println("[BlockingStreamingSender] Affinitizing thread to CPU " + this._cpuAffinityMask);
            UtlThread.setCPUAffinityMask((long)UtlThread.parseAffinityMask((String)this._cpuAffinityMask));
        }
        SystemProperties.dump();
        DecimalFormat dfmt = new DecimalFormat("#,###");
        System.out.println("[BlockingStreamingSender] Descriptor:" + this._descriptor);
        System.out.println("[BlockingStreamingSender] Message size:" + this._messageSize);
        System.out.println("[BlockingStreamingSender] Buffer size:" + this._bufferSize);
        System.out.println("[BlockingStreamingSender] Test count:" + dfmt.format(this._testCount));
        System.out.println("[BlockingStreamingSender] Warmup time:" + this._warmupTime + "s");
        System.out.println("[BlockingStreamingSender] Test rate:" + dfmt.format(this._testRate));
        System.out.println("[BlockingStreamingSender] CPU affinity mask:" + this._cpuAffinityMask);
        System.out.println("[BlockingStreamingSender] Print interval stats:" + this._printIntervalStats);
        System.out.println("[BlockingStreamingSender] Write latencies to file:" + !this._dontWriteLatenciesToFile);
        System.out.println("[BlockingStreamingSender] Establishing link...");
        EmxNwLnkConnector connector = EmxNwLnkConnector.create((String)this._descriptor);
        EmxNwLnk link = connector.connect();
        System.out.println("[BlockingStreamingSender] Configuring link (message size=" + this._messageSize + ")...");
        link.configureBlockingWrite(true);
        int messagesPerBuffer = this._bufferSize / this._messageSize + (this._bufferSize % this._messageSize > 0 ? 1 : 0);
        IOBuffer writeBuffer = IOBuffer.create((int)(this._messageSize * messagesPerBuffer));
        System.out.println("[BlockingStreamingSender] Calculating UtlTime.now() overhead...");
        long nanoTimeOverhead = 0L;
        long start = UtlTime.now();
        int i = 0;
        while ((long)i < 100000000L) {
            UtlTime.now();
            ++i;
        }
        nanoTimeOverhead = (UtlTime.now() - start) / 100000000L;
        System.out.println("[BlockingStreamingSender] ..." + nanoTimeOverhead + "ns");
        StatsLatencyWriter lw = new StatsLatencyWriter("nw-write", this._dontWriteLatenciesToFile ? null : "latencies.write.bin", this._printIntervalStats);
        System.out.println("[BlockingStreamingSender] Warming up (" + this._warmupTime + " seconds)...");
        long warmupStart = System.currentTimeMillis();
        while (System.currentTimeMillis() - warmupStart < (long)this._warmupTime * 1000L) {
            link.write(writeBuffer, 0, writeBuffer.getLength());
        }
        int numSent = 0;
        int bufferRate = this._testRate / messagesPerBuffer;
        int bufferCount = this._testCount / messagesPerBuffer;
        long nanosPerMsg = bufferRate > 0 ? 1000000000L / (long)bufferRate : 0L;
        System.out.println("[BlockingStreamingSender] Running test (" + dfmt.format(this._testCount) + " messages)...");
        System.out.println("[BlockingStreamingSender] ...BufferSize=" + writeBuffer.getLength() + " (" + messagesPerBuffer + " msgs/buffer)");
        System.out.println("[BlockingStreamingSender] ...BufferCount=" + dfmt.format(bufferCount));
        System.out.println("[BlockingStreamingSender] ...BufferRate=" + dfmt.format(bufferRate) + "/sec");
        long next = UtlTime.now() + nanosPerMsg;
        lw.start(bufferRate, bufferCount);
        start = System.currentTimeMillis();
        while (numSent < this._testCount) {
            long ts1 = UtlTime.now();
            if (ts1 < next) continue;
            link.write(writeBuffer, 0, writeBuffer.getLength());
            long ts2 = UtlTime.now();
            int writeTime = (int)(ts2 - ts1 - nanoTimeOverhead);
            lw.write(writeTime);
            next += nanosPerMsg;
            numSent += messagesPerBuffer;
        }
        long stop = System.currentTimeMillis();
        lw.close();
        link.close();
        long overallRate = (long)numSent * 1000L / (stop - start);
        System.out.println("[BlockingStreamingSender] Sent " + dfmt.format(numSent) + " messages at " + dfmt.format(overallRate) + " msgs/sec.");
        if (!this._dontWriteLatenciesToFile) {
            System.out.println("[BlockingStreamingSender] Test complete (run rumi-reporter on latencies.write.bin to calculate latency stats).");
        } else {
            System.out.println("[BlockingStreamingSender] Test complete.");
        }
    }

    public static void main(String[] args) throws Exception {
        System.setProperty("nv.tuning.cpu.enableaffinitymasks", "true");
        new BlockingStreamingSender().run(args);
    }
}

