package org.reaktivity.nukleus.http.internal.bench;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.function.ToIntFunction;
import org.agrona.DirectBuffer;
import org.agrona.IoUtil;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Group;
import org.openjdk.jmh.annotations.GroupThreads;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Control;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.TimeValue;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.function.MessagePredicate;
import org.reaktivity.nukleus.http.internal.HttpController;
import org.reaktivity.nukleus.http.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.http.internal.types.stream.DataFW;
import org.reaktivity.nukleus.http.internal.types.stream.WindowFW;
import org.reaktivity.nukleus.route.RouteKind;
import org.reaktivity.reaktor.Reaktor;
import org.reaktivity.reaktor.ReaktorBuilder;
import org.reaktivity.reaktor.ReaktorConfiguration;

@Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS)
@State(Scope.Benchmark)
@Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
@Fork(3)
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode({Mode.Throughput})
/* loaded from: input_file:org/reaktivity/nukleus/http/internal/bench/HttpServerBM.class */
public class HttpServerBM {

    @State(Scope.Group)
    /* loaded from: input_file:org/reaktivity/nukleus/http/internal/bench/HttpServerBM$GroupState.class */
    public static class GroupState {
        private final ReaktorConfiguration configuration;
        private final Reaktor reaktor;
        private final BeginFW beginRO;
        private final DataFW dataRO;
        private final WindowFW windowRO;
        private final BeginFW.Builder beginRW;
        private final DataFW.Builder dataRW;
        private final WindowFW.Builder windowRW;
        private MutableDirectBuffer throttleBuffer;
        private long sourceRouteId;
        private Writer sourceInput;
        private Reader sourceOutputEst;
        private long sourceInputId;
        private DataFW data;
        private MessageConsumer sourceOutputEstHandler;
        int availableSourceInputWindow;
        int padding;
        public int writeFails;
        public int readFails;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/reaktivity/nukleus/http/internal/bench/HttpServerBM$GroupState$Reader.class */
        public class Reader {
            private final ToIntFunction<MessageConsumer> streams;
            private final MessagePredicate throttle;

            Reader(ToIntFunction<MessageConsumer> toIntFunction, MessagePredicate messagePredicate) {
                this.throttle = messagePredicate;
                this.streams = toIntFunction;
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/reaktivity/nukleus/http/internal/bench/HttpServerBM$GroupState$Writer.class */
        public class Writer {
            private final MessagePredicate streams;
            private final ToIntFunction<MessageConsumer> throttle;

            Writer(MessagePredicate messagePredicate, ToIntFunction<MessageConsumer> toIntFunction) {
                this.streams = messagePredicate;
                this.throttle = toIntFunction;
            }
        }

        public GroupState() {
            Properties properties = new Properties();
            properties.setProperty(ReaktorConfiguration.REAKTOR_DIRECTORY.name(), "target/nukleus-benchmarks");
            properties.setProperty(ReaktorConfiguration.REAKTOR_STREAMS_BUFFER_CAPACITY.name(), Long.toString(16777216L));
            this.configuration = new ReaktorConfiguration(properties);
            IoUtil.ensureDirectoryExists(this.configuration.directory().toFile(), this.configuration.directory().toString());
            try {
                Files.walk(this.configuration.directory(), FileVisitOption.FOLLOW_LINKS).map((v0) -> {
                    return v0.toFile();
                }).forEach((v0) -> {
                    v0.delete();
                });
            } catch (IOException e) {
                LangUtil.rethrowUnchecked(e);
            }
            String str = "http";
            ReaktorBuilder nukleus = Reaktor.builder().config(this.configuration).nukleus((v1) -> {
                return r2.equals(v1);
            });
            String str2 = "http";
            this.reaktor = nukleus.controller((v1) -> {
                return r2.equals(v1);
            }).errorHandler(th -> {
                th.printStackTrace(System.err);
            }).build();
            this.beginRO = new BeginFW();
            this.dataRO = new DataFW();
            this.windowRO = new WindowFW();
            this.beginRW = new BeginFW.Builder();
            this.dataRW = new DataFW.Builder();
            this.windowRW = new WindowFW.Builder();
            this.availableSourceInputWindow = 0;
        }

        @Setup(Level.Trial)
        public void reinit() throws Exception {
            Random random = new Random();
            this.sourceRouteId = ((Long) this.reaktor.controller(HttpController.class).route(RouteKind.SERVER, "http#0", "echo").get()).longValue();
            this.sourceInputId = random.nextLong();
            this.sourceOutputEstHandler = this::processBegin;
            UnsafeBuffer unsafeBuffer = new UnsafeBuffer(new byte[256]);
            BeginFW build = this.beginRW.wrap(unsafeBuffer, 0, unsafeBuffer.capacity()).routeId(this.sourceRouteId).streamId(this.sourceInputId).build();
            this.sourceInput.streams.test(build.typeId(), build.buffer(), build.offset(), build.sizeof());
            byte[] bytes = "POST / HTTP/1.1\r\nHost: localhost:8080\r\nContent-Length:12\r\n\r\nHello, world".getBytes(StandardCharsets.UTF_8);
            this.data = this.dataRW.wrap(unsafeBuffer, 0, unsafeBuffer.capacity()).streamId(this.sourceInputId).payload(builder -> {
                builder.set(bytes);
            }).build();
            this.throttleBuffer = new UnsafeBuffer(ByteBuffer.allocateDirect(12));
            boolean z = false;
            for (int i = 0; i < 100 && !z; i++) {
                Thread.sleep(100L);
                z = write();
            }
            if (!z) {
                throw new RuntimeException("reinit: write() failed");
            }
            for (int i2 = 0; i2 < 100 && this.sourceOutputEst == null; i2++) {
            }
            if (read() <= 0) {
                throw new RuntimeException("reinit: read() failed");
            }
        }

        @TearDown(Level.Trial)
        public void reset() throws Exception {
            this.reaktor.controller(HttpController.class).unroute(this.sourceRouteId).get();
            this.sourceInput = null;
            this.sourceOutputEst = null;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int read() {
            return this.sourceOutputEst.streams.applyAsInt(this::handleSourceOutputEst);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean write() {
            this.sourceInput.throttle.applyAsInt(this::sourceInputThrottle);
            boolean z = this.availableSourceInputWindow >= this.data.length();
            if (z) {
                z = this.sourceInput.streams.test(this.data.typeId(), this.data.buffer(), 0, this.data.limit());
                if (z) {
                    this.availableSourceInputWindow -= this.data.length();
                } else {
                    System.out.println(String.format("write failed, availableSourceInputWindow = %d", Integer.valueOf(this.availableSourceInputWindow)));
                }
            }
            return z;
        }

        private void sourceInputThrottle(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 1073741825:
                    System.out.println("ERROR: reset detected in sourceInputThrottle");
                    return;
                case 1073741826:
                    this.windowRO.wrap(directBuffer, i2, i2 + i3);
                    this.padding = this.windowRO.padding();
                    return;
                default:
                    System.out.println(String.format("ERROR: unexpected msgTypeId %d detected in sourceInputThrottle", Integer.valueOf(i)));
                    return;
            }
        }

        private void handleSourceOutputEst(int i, DirectBuffer directBuffer, int i2, int i3) {
            this.sourceOutputEstHandler.accept(i, directBuffer, i2, i3);
        }

        private void processBegin(int i, DirectBuffer directBuffer, int i2, int i3) {
            this.beginRO.wrap(directBuffer, i2, i2 + i3);
            doWindow(this.beginRO.streamId(), 8192);
            this.sourceOutputEstHandler = this::processData;
        }

        private void processData(int i, DirectBuffer directBuffer, int i2, int i3) {
            this.dataRO.wrap(directBuffer, i2, i2 + i3);
            doWindow(this.dataRO.streamId(), this.dataRO.length());
        }

        private void doWindow(long j, int i) {
            WindowFW build = this.windowRW.wrap(this.throttleBuffer, 0, this.throttleBuffer.capacity()).streamId(j).build();
            this.sourceOutputEst.throttle.test(build.typeId(), build.buffer(), build.offset(), build.sizeof());
        }
    }

    @GroupThreads(1)
    @Benchmark
    @Group("throughput")
    public int writer(GroupState groupState, Control control) throws Exception {
        boolean write;
        while (true) {
            write = groupState.write();
            if (write || control.stopMeasurement) {
                break;
            }
            Thread.yield();
        }
        return write ? 1 : 0;
    }

    @GroupThreads(1)
    @Benchmark
    @Group("throughput")
    public int reader(GroupState groupState, Control control) throws Exception {
        int read;
        while (true) {
            read = groupState.read();
            if (read != 0 || control.stopMeasurement) {
                break;
            }
            Thread.yield();
        }
        return read;
    }

    public static void main(String[] strArr) throws RunnerException {
        new Runner(new OptionsBuilder().include(HttpServerBM.class.getSimpleName()).forks(0).threads(1).warmupIterations(0).measurementIterations(1).measurementTime(new TimeValue(10L, TimeUnit.SECONDS)).build()).run();
    }
}
