/*
 * Decompiled with CFR 0.152.
 */
package com.neeve.perf.link;

import com.neeve.emx.EmxFactory;
import com.neeve.emx.IEmxNwLnkClientEndpoint;
import com.neeve.emx.IEmxNwLnkPeerEndpoint;
import com.neeve.io.IOBuffer;
import com.neeve.perf.common.SystemProperties;
import com.neeve.stats.StatsLatencyWriter;
import com.neeve.tools.interactive.commands.AnnotatedCommand;
import com.neeve.util.UtlThread;
import com.neeve.util.UtlTime;
import java.text.DecimalFormat;

@AnnotatedCommand.Command(keywords={"RdmaStreamingSender"}, description="A blocking sender to test RDMA streaming sends")
public class RdmaStreamingSender
extends AnnotatedCommand {
    @AnnotatedCommand.Option(shortForm=100, longForm="descriptor", required=true, description="The connection descriptor to use e.g. rdma://192.168.1.7:12000")
    private String _descriptor;
    @AnnotatedCommand.Option(shortForm=109, longForm="messageSize", defaultValue="256", required=true, description="The size of the message to stream")
    private int _messageSize;
    @AnnotatedCommand.Option(shortForm=116, longForm="testCount", defaultValue="100000000", description="The test count")
    private int _testCount;
    @AnnotatedCommand.Option(shortForm=114, longForm="testRate", defaultValue="1000000", description="The send rate")
    private int _testRate;
    @AnnotatedCommand.Option(shortForm=99, longForm="cpuAffinityMask", description="which CPU(s) to affinitize the sending thread to")
    private String _cpuAffinityMask;
    @AnnotatedCommand.Option(shortForm=111, longForm="oneWayLatency", description="whether to calculate one-way latency values")
    private boolean _oneWay;
    @AnnotatedCommand.Option(shortForm=105, longForm="printIntervalStats", description="whether to output stats at periodic intervals instead of only at the end")
    private boolean _printIntervalStats;
    @AnnotatedCommand.Option(shortForm=102, longForm="dontWriteLatenciesToFile", description="whether to suppress writing latency values to a file")
    private boolean _dontWriteLatenciesToFile;

    public final void execute() throws Exception {
        if (this._cpuAffinityMask != null) {
            System.out.println("[RdmaStreamingSender] Affinitizing thread to CPU " + this._cpuAffinityMask);
            UtlThread.setCPUAffinityMask((long)UtlThread.parseAffinityMask((String)this._cpuAffinityMask));
        }
        SystemProperties.dump();
        DecimalFormat dfmt = new DecimalFormat("#,###");
        System.out.println("[RdmaStreamingSender] Message size:" + this._messageSize);
        System.out.println("[RdmaStreamingSender] Test count:" + dfmt.format(this._testCount));
        System.out.println("[RdmaStreamingSender] Test rate:" + dfmt.format(this._testRate));
        System.out.println("[RdmaStreamingSender] CPU affinity mask:" + this._cpuAffinityMask);
        System.out.println("[RdmaStreamingSender] One way latency:" + this._oneWay);
        System.out.println("[RdmaStreamingSender] Print interval stats:" + this._printIntervalStats);
        System.out.println("[RdmaStreamingSender] Write latencies to file:" + !this._dontWriteLatenciesToFile);
        System.out.println("[RdmaStreamingSender] Establishing link...");
        IEmxNwLnkClientEndpoint cep = EmxFactory.getInstance().createNwLnkClientEndpoint(EmxFactory.EmxImpl.UCX, this._descriptor, null);
        IEmxNwLnkPeerEndpoint pep = cep.connect();
        System.out.println("[RdmaStreamingSender] Creating write buffer...");
        IOBuffer writeBuffer = IOBuffer.create((int)this._messageSize);
        System.out.println("[RdmaStreamingSender] Calculating nanoTime overhead...");
        long nanoTimeOverhead = 0L;
        long start = UtlTime.now();
        int i = 0;
        while ((long)i < 100000000L) {
            UtlTime.now();
            ++i;
        }
        nanoTimeOverhead = (UtlTime.now() - start) / 100000000L;
        System.out.println("[RdmaStreamingSender] ..." + nanoTimeOverhead + "ns");
        StatsLatencyWriter lw = new StatsLatencyWriter("nw-write", this._dontWriteLatenciesToFile ? null : "latencies.write.bin", this._printIntervalStats);
        System.out.println("[RdmaStreamingSender] Running test (" + this._testCount + " messages)...");
        System.out.println("[RdmaStreamingSender] ...BufferSize=" + writeBuffer.getLength());
        int i2 = 0;
        long nanosPerMsg = this._testRate > 0 ? 1000000000L / (long)this._testRate : 0L;
        long next = UtlTime.now() + nanosPerMsg;
        lw.start(this._testRate, this._testCount);
        while (i2 < this._testCount) {
            long ts1 = UtlTime.now();
            if (ts1 < next) continue;
            pep.write(writeBuffer, 0, writeBuffer.getLength());
            long ts2 = UtlTime.now();
            int latency = (int)(ts2 - ts1 - nanoTimeOverhead);
            if (this._oneWay) {
                latency /= 2;
            }
            lw.write(latency);
            ++i2;
            next += nanosPerMsg;
        }
        lw.close();
        if (!this._dontWriteLatenciesToFile) {
            System.out.println("[RdmaStreamingSender] Test complete (run rumi-reporter on latencies.write.bin to calculate latency stats).");
        } else {
            System.out.println("[RdmaStreamingSender] Test complete.");
        }
    }

    public static void main(String[] args) throws Exception {
        System.setProperty("nv.tuning.cpu.enableaffinitymasks", "true");
        new RdmaStreamingSender().run(args);
    }
}

