/*
 * Decompiled with CFR 0.152.
 */
package com.neeve.perf.persist;

import com.eaio.uuid.UUID;
import com.neeve.io.IOBuffer;
import com.neeve.ods.IStoreBinding;
import com.neeve.ods.IStoreObject;
import com.neeve.ods.StoreBinding;
import com.neeve.perf.serialization.MessageFactory;
import com.neeve.pkt.PktFactory;
import com.neeve.pkt.PktPacket;
import com.neeve.pkt.log.PktRecoveryLog;
import com.neeve.rog.IRogMessage;
import com.neeve.stats.StatsLatencyWriter;
import com.neeve.util.UtlThread;
import jargs.gnu.CmdLineParser;
import java.io.IOException;
import java.text.DecimalFormat;

public final class PacketLogger {
    private final PktRecoveryLog _log;
    private final DecimalFormat _dfmt = new DecimalFormat("#,###");

    PacketLogger(String logLocation, int initialLogLength, boolean zeroOutInitial, int pageSize, int readBufferSize, int writeBufferSize, boolean flushUsingMappedMemory, boolean flushDirectFromPacket) throws Exception {
        System.out.println("Creating log...");
        long ts = System.nanoTime();
        this._log = PktRecoveryLog.create((String)logLocation, (String)"perf.log").setOpenMode(PktRecoveryLog.FileOpenMode.rw).setInitialLength((long)initialLogLength * 1024L * 1024L * 1024L).setZeroOutInitial(zeroOutInitial).setPageSize(pageSize).setReadBufferSize(readBufferSize).setWriteBufferSize(writeBufferSize).setFlushUsingMappedMemory(flushUsingMappedMemory).setFlushDirectFromPacket(flushDirectFromPacket);
        System.out.println("Created log in " + this._dfmt.format((System.nanoTime() - ts) / 1000L) + " us");
    }

    private final IRogMessage createMessage() {
        return (IRogMessage)new MessageFactory("rumi.protobuf").createCar(true);
    }

    private final PktPacket createPacket() {
        return PktFactory.getInstance().createPacket(257);
    }

    private final PktPacket populatePacket(IRogMessage message, PktPacket packet, boolean populateStorePacketMetadata) {
        message.serializeTo(packet);
        if (populateStorePacketMetadata) {
            StoreBinding.prepareStorePacketHeader((IStoreObject)message, (PktPacket)packet, (IStoreBinding.Operation)IStoreBinding.Operation.Send, (UUID)message.getOid(), (short)message.getOfid(), (short)message.getObjectType(), (long)message.getTransactionId(), (long)message.getStableTransactionId(), (long)message.getCheckpointVersion(), (IStoreObject.EncodingType)message.getContentEncodingType(), (boolean)false, (boolean)true, null);
        }
        packet.sync();
        return packet;
    }

    private final void write(int count, int warmupTime, int rate, boolean sync, boolean supportTailing, boolean populateStorePacketMetadata, int flushAfter, long nanoTimeOverhead, boolean noLatencyWrites, boolean printIntervalStats) throws Exception {
        IRogMessage message = this.createMessage();
        PktPacket packet = this.createPacket();
        StatsLatencyWriter prepTimes = new StatsLatencyWriter("prep", noLatencyWrites ? null : "latencies.prep.bin", true, printIntervalStats, false);
        StatsLatencyWriter writeTimes = new StatsLatencyWriter("write", noLatencyWrites ? null : "latencies.write.bin", false, printIntervalStats, false);
        StatsLatencyWriter flushTimes = new StatsLatencyWriter("flush", noLatencyWrites ? null : "latencies.flush.bin", false, printIntervalStats, false);
        StatsLatencyWriter totalTimes = new StatsLatencyWriter("total", noLatencyWrites ? null : "latencies.total.bin", false, printIntervalStats, false);
        System.out.println("Writing...");
        int i = 0;
        long start = System.nanoTime();
        long nanosPerMsg = rate > 0 ? 1000000000L / (long)rate : 0L;
        long next = start + nanosPerMsg;
        boolean warmupCompleted = false;
        int postWarmupCount = 0;
        long postWarmupStart = 0L;
        long numToFlush = flushAfter <= 0 ? Long.MAX_VALUE : (long)flushAfter;
        int numFlushes = 0;
        prepTimes.start(rate, count);
        writeTimes.start(rate, count);
        flushTimes.start(rate, count);
        totalTimes.start(rate, count);
        while (i < count) {
            long current = System.nanoTime();
            if (current >= next) {
                long t0 = System.nanoTime();
                this.populatePacket(message, packet, populateStorePacketMetadata);
                long t1 = System.nanoTime();
                int prepTime = (int)(t1 - t0 - nanoTimeOverhead);
                this._log.write(packet, sync ? 1 : 0);
                int writeTime = (int)(System.nanoTime() - t1 - nanoTimeOverhead);
                int flushTime = -1;
                if (--numToFlush == 0L) {
                    t1 = System.nanoTime();
                    this._log.flush(sync ? 1 : 0);
                    flushTime = (int)(System.nanoTime() - t1 - nanoTimeOverhead);
                    ++numFlushes;
                    numToFlush = flushAfter;
                }
                prepTimes.write(prepTime);
                writeTimes.write(writeTime);
                if (flushTime >= 0) {
                    flushTimes.write(flushTime);
                    totalTimes.write(prepTime + flushTime + writeTime);
                } else {
                    totalTimes.write(prepTime + writeTime);
                }
                next += nanosPerMsg;
                ++i;
                if (warmupCompleted) {
                    ++postWarmupCount;
                }
            }
            if (warmupCompleted || current - start <= (long)warmupTime * 1000000000L) continue;
            System.out.println("Warm up complete.");
            postWarmupStart = System.nanoTime();
            warmupCompleted = true;
        }
        long stop = System.nanoTime();
        prepTimes.stop();
        writeTimes.stop();
        flushTimes.stop();
        totalTimes.stop();
        prepTimes.close(false);
        writeTimes.close(false);
        if (numFlushes > 0) {
            flushTimes.close(false);
        }
        totalTimes.close(false);
        prepTimes.finish();
        int overallRate = (int)((long)postWarmupCount * 1000000000L / (stop - postWarmupStart));
        System.out.println("Wrote " + this._dfmt.format(postWarmupCount) + " packets @ " + this._dfmt.format(overallRate) + " pkts/sec post warmup.");
        System.out.println("Write complete (run rumi-reporter on latencies.*.bin to calculate latency stats)");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private final void readUsingMemoryMappedReader(int count, int warmupTime, boolean usePacketBufferReceiver) throws IOException {
        PacketProcessor processor = new PacketProcessor();
        long ts = System.nanoTime();
        try (PktRecoveryLog.MemoryMappedReader reader = usePacketBufferReceiver ? this._log.createMemoryMappedReader((PktRecoveryLog.IPacketBufferReceiver)new PacketBufferReceiver(processor)) : this._log.createMemoryMappedReader((PktRecoveryLog.IPacketReceiver)new PacketReceiver(processor));){
            System.out.println("Created reader in " + this._dfmt.format((System.nanoTime() - ts) / 1000L) + " us");
            processor.start(warmupTime);
            while (processor.count() < count) {
                reader.read();
            }
            processor.done();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private final void readUsingIterativeReader(int count, int warmupTime) throws IOException {
        long ts = System.nanoTime();
        try (PktRecoveryLog.Reader reader = this._log.createReader();){
            System.out.println("Created reader in " + this._dfmt.format((System.nanoTime() - ts) / 1000L) + " us");
            PacketProcessor processor = new PacketProcessor();
            processor.start(warmupTime);
            for (int j = 0; j < count; ++j) {
                PktPacket packet = reader.next();
                if (packet == null) continue;
                processor.handlePacket(packet);
            }
            processor.done();
        }
    }

    private final void read(int count, int warmupTime, boolean readUsingMappedMemory, boolean usePacketBufferReceiver) throws Exception {
        System.out.println("Reading (memoryMapped=" + readUsingMappedMemory + ", count=" + count + ")...");
        if (readUsingMappedMemory) {
            this.readUsingMemoryMappedReader(count, warmupTime, usePacketBufferReceiver);
        } else {
            this.readUsingIterativeReader(count, warmupTime);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private final void run(final int count, final int warmupTime, int rate, boolean sync, boolean supportTailing, boolean tail, boolean populateStorePacketMetadata, int flushAfter, final boolean readUsingMappedMemory, final boolean usePacketBufferReceiver, boolean noLatencyWrites, boolean printIntervalStats) throws Exception {
        if (tail) {
            supportTailing = true;
        }
        System.out.println("");
        System.out.println("*** Run Parameters");
        System.out.println("*** ...populateStorePacketMetadata=" + populateStorePacketMetadata);
        System.out.println("*** ...flushAfter=" + flushAfter + " (0=flush disabled)");
        System.out.println("*** ...sync=" + sync);
        System.out.println("***");
        System.out.println("*** ...count=" + count);
        System.out.println("*** ...warmupTime=" + warmupTime);
        System.out.println("*** ...rate=" + rate);
        System.out.println("***");
        System.out.println("*** ...readUsingMappedMemory=" + readUsingMappedMemory);
        System.out.println("*** ...usePacketBufferReceiver=" + usePacketBufferReceiver);
        System.out.println("*** ...supportTailing=" + supportTailing);
        System.out.println("*** ...tail=" + tail);
        System.out.println("***");
        System.out.println("*** ...noLatencyWrites=" + noLatencyWrites);
        System.out.println("*** ...printIntervalStats=" + printIntervalStats);
        System.out.println("");
        long nanoTimeOverhead = 0L;
        long start = System.nanoTime();
        System.out.println("Calculating nanoTime() overhead...");
        for (int i = 0; i < 100000000; ++i) {
            System.nanoTime();
        }
        nanoTimeOverhead = (System.nanoTime() - start) / 100000000L;
        try {
            System.out.println("Opening log...");
            long ts = System.nanoTime();
            this._log.open(supportTailing ? 128 : 0);
            System.out.println("Opened log in " + this._dfmt.format((System.nanoTime() - ts) / 1000L) + " us");
            if (tail) {
                new Thread(){

                    @Override
                    public final void run() {
                        try {
                            PacketLogger.this.read(count, warmupTime, readUsingMappedMemory, usePacketBufferReceiver);
                        }
                        catch (Throwable e) {
                            e.printStackTrace();
                        }
                    }
                }.start();
            }
            this.write(count, warmupTime, rate, sync, supportTailing, populateStorePacketMetadata, flushAfter, nanoTimeOverhead, noLatencyWrites, printIntervalStats);
            this._log.flush(sync ? 1 : 0);
            System.out.println("");
            if (!tail) {
                System.out.println("Closing log...");
                ts = System.nanoTime();
                this._log.close();
                System.out.println("Log closed in " + this._dfmt.format((System.nanoTime() - ts) / 1000L) + " us");
                System.out.println("Opening log...");
                ts = System.nanoTime();
                this._log.open(supportTailing ? 128 : 0);
                System.out.println("Opened log in " + this._dfmt.format((System.nanoTime() - ts) / 1000L) + " us");
                this.read(count, warmupTime, readUsingMappedMemory, usePacketBufferReceiver);
            }
        }
        finally {
            System.out.println("Closing log...");
            this._log.close();
        }
    }

    private static void printUsage() {
        System.err.println("Usage PacketLogger");
        System.err.println("--------------------------------------------------------------------------------------------------------------------");
        System.err.println("  [{-l, --logLocation} the directory where to create the log]");
        System.err.println("    Specifies the directory where the log should be created (default=\".\")");
        System.err.println("  [{-i, --initialLogLength} the preallocated length of the log]");
        System.err.println("    Specifies the preallocated length (in gigabytes) of the log (default=1)");
        System.err.println("  [{-z, --zeroOutInitial} zeroes out the preallocated length. only applies if --initialLength is specified and > 0]");
        System.err.println("    Specifies whether to zero out the preallocated length of the log (default=false)");
        System.err.println("--------------------------------------------------------------------------------------------------------------------");
        System.err.println("  [{-v, --populateStorePacketMetadata} populate metadata in packets being written as done by ODS binding when storing/replicating packets]");
        System.err.println("    Populate packet metadata in packets being written (default=false)");
        System.err.println("  [{-a, --writeBufferSize} specifies the write buffer size]");
        System.err.println("    Specifies (in bytes) the threshold on in-memory cached log entries that trigger automatic flushes (default=8192)");
        System.err.println("  [{-f, --flushAfter} number of writes to flush after]");
        System.err.println("    Number of writes to flush after (default=0 i.e. no explicit flush)");
        System.err.println("  [{-m, --flushUsingMappedMemory} whether to flush using a memory mapped IO");
        System.err.println("    Specifies whether to use a memory mapped IO to perform flush operations (default=false)");
        System.err.println("  [{-d, --flushDirectFromPacket} whether to flush directly from packet or not");
        System.err.println("    Specifies whether to flush directly from packet to the file (default=false)");
        System.err.println("  [{-y, --sync} sync during flush]");
        System.err.println("    Sync when flushing (default=false)");
        System.err.println("--------------------------------------------------------------------------------------------------------------------");
        System.err.println("  [{-b, --readBufferSize} specifies the read buffer size");
        System.err.println("    Specifies (in bytes) the size of the buffer used to perform reads. Does not apply if --readUsingMappedMemory is specified (default=8192)");
        System.err.println("  [{-e, --readUsingMappedMemory} whether to read using a memory mapped IO");
        System.err.println("    Specifies whether to use memory mapped IO to perform read operations (default=true)");
        System.err.println("  [{-g, --usePacketBufferReceiver} whether to use a packet buffer receiver to receive packets on read.");
        System.err.println("    Specifies whether to use a packet buffer receiver to receive read packets. Only applies if --readUsingMappedMemory is specified  (default=true)");
        System.err.println("  [{-p, --pageSize} specifies the disk subsystem page size]");
        System.err.println("    Specifies (in bytes) the default for read buffer and write buffer sizes (default=8192)");
        System.err.println("--------------------------------------------------------------------------------------------------------------------");
        System.err.println("  [{-t, --supportTailing} run in mode that supports tailing]");
        System.err.println("    Run in mode that supports tailing (default=false)");
        System.err.println("  [{-x, --tail} run in tail mode]");
        System.err.println("    Run in tail mode i.e. concurrent read write. This option implicitly switches on --supportTailing (default=false)");
        System.err.println("--------------------------------------------------------------------------------------------------------------------");
        System.err.println("  [{-c, --count} number of packets to write]");
        System.err.println("    Number of packets to write (default=10,000,000)");
        System.err.println("  [{-t, --warmupTime} Warmup time]");
        System.err.println("    Warmup time, in seconds, for calculation of throughput stats (default=2 (2 seconds))");
        System.err.println("  [{-r, --rate} packet write rate]");
        System.err.println("    Rate at which to write packets (default=-1 (unlimited))");
        System.err.println("--------------------------------------------------------------------------------------------------------------------");
        System.err.println("  [{-a, --noLatencyWrites} don't write latencies to a file");
        System.err.println("    Indicates that latencies should not be written to a file (default=false)");
        System.err.println("  [{-b, --printIntervalStats} print interval latency stats");
        System.err.println("    Indicates that latencies stats should be printed on a periodic basis in addition to at the end (default=false)");
        System.err.println("--------------------------------------------------------------------------------------------------------------------");
        System.err.println("  [{-j, --affinity} CPU affinity of the write/read thread");
        System.err.println("    Sets the CPU affinity of the thread performing the read/write (default=false)");
        System.err.println("--------------------------------------------------------------------------------------------------------------------");
        System.err.println("  [{-h, --help} print this help string]");
        System.err.println("");
    }

    public static final void main(String[] args) throws Exception {
        CmdLineParser parser = new CmdLineParser();
        CmdLineParser.Option logLocationOption = parser.addStringOption('l', "logLocation");
        CmdLineParser.Option initialLogLengthOption = parser.addIntegerOption('i', "initialLogLength");
        CmdLineParser.Option zeroOutInitialOption = parser.addBooleanOption('z', "zeroOutInitial");
        CmdLineParser.Option populateStorePacketMetadataOption = parser.addBooleanOption('v', "populateStorePacketMetadata");
        CmdLineParser.Option writeBufferSizeOption = parser.addIntegerOption('a', "writeBufferSize");
        CmdLineParser.Option flushAfterOption = parser.addIntegerOption('f', "flushAfter");
        CmdLineParser.Option flushUsingMappedMemoryOption = parser.addBooleanOption('m', "flushUsingMappedMemory");
        CmdLineParser.Option flushDirectFromPacketOption = parser.addBooleanOption('d', "flushDirectFromPacket");
        CmdLineParser.Option syncOption = parser.addBooleanOption('y', "sync");
        CmdLineParser.Option readBufferSizeOption = parser.addIntegerOption('b', "readBufferSize");
        CmdLineParser.Option readUsingMappedMemoryOption = parser.addBooleanOption('e', "readUsingMappedMemory");
        CmdLineParser.Option usePacketBufferReceiverOption = parser.addBooleanOption('g', "usePacketBufferReceiver");
        CmdLineParser.Option pageSizeOption = parser.addIntegerOption('p', "pageSize");
        CmdLineParser.Option supportTailingOption = parser.addBooleanOption('t', "supportTailing");
        CmdLineParser.Option tailOption = parser.addBooleanOption('x', "tail");
        CmdLineParser.Option countOption = parser.addIntegerOption('c', "count");
        CmdLineParser.Option warmupTimeOption = parser.addIntegerOption('t', "warmupTime");
        CmdLineParser.Option rateOption = parser.addIntegerOption('r', "rate");
        CmdLineParser.Option noLatencyWritesOption = parser.addBooleanOption('a', "noLatencyWrites");
        CmdLineParser.Option printIntervalStatsOption = parser.addBooleanOption('b', "printIntervalStats");
        CmdLineParser.Option affinityOption = parser.addStringOption('j', "affinity");
        CmdLineParser.Option helpOption = parser.addBooleanOption('h', "help");
        try {
            parser.parse(args);
            if (!((Boolean)parser.getOptionValue(helpOption, (Object)false)).booleanValue()) {
                String affinityStr = (String)parser.getOptionValue(affinityOption, null);
                if (affinityStr != null) {
                    System.setProperty("nv.tuning.cpu.enableaffinitymasks", "true");
                    UtlThread.setCPUAffinityMask((long)UtlThread.parseAffinityMask((String)affinityStr));
                }
                new PacketLogger((String)parser.getOptionValue(logLocationOption, (Object)"."), (Integer)parser.getOptionValue(initialLogLengthOption, (Object)1), (Boolean)parser.getOptionValue(zeroOutInitialOption, (Object)false), (Integer)parser.getOptionValue(pageSizeOption, (Object)4096), (Integer)parser.getOptionValue(readBufferSizeOption, (Object)8192), (Integer)parser.getOptionValue(writeBufferSizeOption, (Object)8192), (Boolean)parser.getOptionValue(flushUsingMappedMemoryOption, (Object)false), (Boolean)parser.getOptionValue(flushDirectFromPacketOption, (Object)false)).run((Integer)parser.getOptionValue(countOption, (Object)10000000), (Integer)parser.getOptionValue(warmupTimeOption, (Object)2), (Integer)parser.getOptionValue(rateOption, (Object)-1), (Boolean)parser.getOptionValue(syncOption, (Object)false), (Boolean)parser.getOptionValue(supportTailingOption, (Object)false), (Boolean)parser.getOptionValue(tailOption, (Object)false), (Boolean)parser.getOptionValue(populateStorePacketMetadataOption, (Object)false), (Integer)parser.getOptionValue(flushAfterOption, (Object)0), (Boolean)parser.getOptionValue(readUsingMappedMemoryOption, (Object)true), (Boolean)parser.getOptionValue(usePacketBufferReceiverOption, (Object)true), (Boolean)parser.getOptionValue(noLatencyWritesOption, (Object)false), (Boolean)parser.getOptionValue(printIntervalStatsOption, (Object)false));
            } else {
                PacketLogger.printUsage();
            }
        }
        catch (CmdLineParser.OptionException e) {
            System.err.println(e.getMessage());
            PacketLogger.printUsage();
        }
    }

    private final class PacketBufferReceiver
    implements PktRecoveryLog.IPacketBufferReceiver {
        private final PacketProcessor _processor;
        private final PktFactory _packetFactory;
        private final PktPacket _packet;

        PacketBufferReceiver(PacketProcessor processor) {
            this._processor = processor;
            this._packetFactory = PktFactory.getInstance();
            this._packet = this._packetFactory.createPacket(257);
        }

        public final void onPacket(IOBuffer buffer, int bufferPosition, int packetLength, long filePosition) {
            this._packet.deserialize(buffer, bufferPosition, true);
            this._packet.acquire();
            this._processor.handlePacket(this._packet);
        }

        public final void onBufferChange(IOBuffer from, IOBuffer to) {
        }
    }

    private final class PacketReceiver
    implements PktRecoveryLog.IPacketReceiver {
        private final PacketProcessor _processor;

        PacketReceiver(PacketProcessor processor) {
            this._processor = processor;
        }

        public final void onPacket(PktPacket packet) {
            this._processor.handlePacket(packet);
        }
    }

    private final class PacketProcessor {
        private int _i;
        private long _start;
        private int _warmupTime;
        private boolean _warmupCompleted;
        private int _postWarmupCount;
        private long _postWarmupStart;

        private PacketProcessor() {
        }

        final void start(int warmupTime) {
            this._i = 0;
            this._start = System.nanoTime();
            this._warmupTime = warmupTime;
            this._warmupCompleted = false;
            this._postWarmupCount = 0;
            this._postWarmupStart = 0L;
        }

        final void handlePacket(PktPacket packet) {
            if (packet != null) {
                packet.dispose();
            }
            ++this._i;
            if (this._warmupCompleted) {
                ++this._postWarmupCount;
            }
            long current = System.nanoTime();
            if (!this._warmupCompleted && current - this._start > (long)this._warmupTime * 1000000000L) {
                System.out.println("Warm up complete.");
                this._postWarmupStart = current;
                this._warmupCompleted = true;
            }
        }

        final int count() {
            return this._i;
        }

        final void done() {
            long current = System.nanoTime();
            int overallRate = (int)((long)this._postWarmupCount * 1000000000L / (current - this._postWarmupStart));
            System.out.println("Read " + PacketLogger.this._dfmt.format(this._postWarmupCount) + " packets @ " + PacketLogger.this._dfmt.format(overallRate) + " pkts/sec post warmup.");
        }
    }
}

