package org.reaktivity.nukleus.http_push.internal.bench;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.UnsafeBuffer;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Group;
import org.openjdk.jmh.annotations.GroupThreads;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Control;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.reaktivity.nukleus.Configuration;
import org.reaktivity.nukleus.http_push.internal.HttpPushController;
import org.reaktivity.nukleus.http_push.internal.HttpPushStreams;
import org.reaktivity.nukleus.http_push.internal.types.Flyweight;
import org.reaktivity.nukleus.http_push.internal.types.HttpHeaderFW;
import org.reaktivity.nukleus.http_push.internal.types.ListFW;
import org.reaktivity.nukleus.http_push.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.http_push.internal.types.stream.DataFW;
import org.reaktivity.nukleus.http_push.internal.types.stream.HttpBeginExFW;
import org.reaktivity.nukleus.http_push.internal.types.stream.WindowFW;
import org.reaktivity.reaktor.Reaktor;
import org.reaktivity.reaktor.ReaktorBuilder;
import org.reaktivity.reaktor.matchers.NukleusMatcher;

@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@State(Scope.Benchmark)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(3)
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode({Mode.Throughput})
/* loaded from: input_file:org/reaktivity/nukleus/http_push/internal/bench/HttpPushServerBM.class */
public class HttpPushServerBM {
    private final Configuration configuration;
    private final Reaktor reaktor;
    private HttpPushController controller;
    private final BeginFW beginRO;
    private final DataFW dataRO;
    private final BeginFW.Builder beginRW;
    private final DataFW.Builder dataRW;
    private final WindowFW.Builder windowRW;
    private final HttpBeginExFW.Builder httpBeginExRW;
    private HttpPushStreams sourceInputStreams;
    private HttpPushStreams sourceOutputEstStreams;
    private MutableDirectBuffer throttleBuffer;
    private long sourceInputRef;
    private long targetInputRef;
    private long sourceInputId;
    private DataFW data;
    private MessageHandler sourceOutputEstHandler;

    public HttpPushServerBM() {
        Properties properties = new Properties();
        properties.setProperty("nuklei.directory", "target/nukleus-benchmarks");
        properties.setProperty("nuklei.streams.buffer.capacity", Long.toString(16777216L));
        String str = "http-push";
        NukleusMatcher nukleusMatcher = (v1) -> {
            return r0.equals(v1);
        };
        this.configuration = new Configuration(properties);
        try {
            Files.walk(this.configuration.directory(), FileVisitOption.FOLLOW_LINKS).map((v0) -> {
                return v0.toFile();
            }).forEach((v0) -> {
                v0.delete();
            });
        } catch (IOException e) {
            LangUtil.rethrowUnchecked(e);
        }
        ReaktorBuilder discover = Reaktor.builder().config(this.configuration).discover(nukleusMatcher);
        Class<HttpPushController> cls = HttpPushController.class;
        HttpPushController.class.getClass();
        this.reaktor = discover.discover(cls::isAssignableFrom).errorHandler(th -> {
            th.printStackTrace(System.err);
        }).build();
        this.controller = this.reaktor.controller(HttpPushController.class);
        this.beginRO = new BeginFW();
        this.dataRO = new DataFW();
        this.beginRW = new BeginFW.Builder();
        this.dataRW = new DataFW.Builder();
        this.windowRW = new WindowFW.Builder();
        this.httpBeginExRW = new HttpBeginExFW.Builder();
    }

    @Setup(Level.Trial)
    public void reinit() throws Exception {
        Random random = new Random();
        HttpPushController controller = this.reaktor.controller(HttpPushController.class);
        this.targetInputRef = random.nextLong();
        this.sourceInputRef = ((Long) controller.routeProxy("source", 0L, "target", this.targetInputRef).get()).longValue();
        this.sourceInputStreams = controller.streams("source");
        this.sourceOutputEstStreams = controller.streams("http-push", "target");
        this.sourceInputId = random.nextLong();
        this.sourceOutputEstHandler = this::processBegin;
        builder -> {
            builder.item(builder -> {
                builder.name(":scheme").value("http");
            });
            builder.item(builder2 -> {
                builder2.name(":method").value("GET");
            });
            builder.item(builder3 -> {
                builder3.name(":path").value("/");
            });
            builder.item(builder4 -> {
                builder4.name("host").value("localhost:8080");
            });
            builder.item(builder5 -> {
                builder5.name("upgrade").value("websocket");
            });
            builder.item(builder6 -> {
                builder6.name("sec-websocket-key").value("dGhlIHNhbXBsZSBub25jZQ==");
            });
            builder.item(builder7 -> {
                builder7.name("sec-websocket-version").value("13");
            });
        };
        UnsafeBuffer unsafeBuffer = new UnsafeBuffer(new byte[256]);
        BeginFW build = this.beginRW.wrap(unsafeBuffer, 0, unsafeBuffer.capacity()).streamId(this.sourceInputId).source("http-push").sourceRef(this.sourceInputRef).correlationId(random.nextLong()).build();
        this.sourceInputStreams.writeStreams(build.typeId(), build.buffer(), build.offset(), build.sizeof());
        byte[] bytes = "Hello, world".getBytes(StandardCharsets.UTF_8);
        byte[] bArr = {-126, -116, 1, 2, 3, 4, (byte) (bytes[0] ^ bArr[2]), (byte) (bytes[1] ^ bArr[3]), (byte) (bytes[2] ^ bArr[4]), (byte) (bytes[3] ^ bArr[5]), (byte) (bytes[4] ^ bArr[2]), (byte) (bytes[5] ^ bArr[3]), (byte) (bytes[6] ^ bArr[4]), (byte) (bytes[7] ^ bArr[5]), (byte) (bytes[8] ^ bArr[2]), (byte) (bytes[9] ^ bArr[3]), (byte) (bytes[10] ^ bArr[4]), (byte) (bytes[11] ^ bArr[5])};
        this.data = this.dataRW.wrap(unsafeBuffer, 0, unsafeBuffer.capacity()).streamId(this.sourceInputId).payload(builder2 -> {
            builder2.set(bArr);
        }).build();
        this.throttleBuffer = new UnsafeBuffer(ByteBuffer.allocateDirect(12));
    }

    @TearDown(Level.Trial)
    public void reset() throws Exception {
        this.reaktor.controller(HttpPushController.class).unrouteProxy("source", this.sourceInputRef, "target", this.targetInputRef).get();
        this.sourceInputStreams.close();
        this.sourceInputStreams = null;
        this.sourceOutputEstStreams.close();
        this.sourceOutputEstStreams = null;
    }

    @GroupThreads(1)
    @Benchmark
    @Group("throughput")
    public void writer(Control control) throws Exception {
        while (!control.stopMeasurement && !this.sourceInputStreams.writeStreams(this.data.typeId(), this.data.buffer(), 0, this.data.limit())) {
            Thread.yield();
        }
        while (!control.stopMeasurement && this.sourceInputStreams.readThrottle((i, mutableDirectBuffer, i2, i3) -> {
        }) == 0) {
            Thread.yield();
        }
    }

    @GroupThreads(1)
    @Benchmark
    @Group("throughput")
    public void reader(Control control) throws Exception {
        while (!control.stopMeasurement && this.sourceOutputEstStreams.readStreams(this::handleReply) == 0) {
            Thread.yield();
        }
    }

    private void handleReply(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
        this.sourceOutputEstHandler.onMessage(i, mutableDirectBuffer, i2, i3);
    }

    private void processBegin(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
        this.beginRO.wrap(mutableDirectBuffer, i2, i2 + i3);
        doWindow(this.beginRO.streamId(), 8192);
        this.sourceOutputEstHandler = this::processData;
    }

    private void processData(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
        this.dataRO.wrap(mutableDirectBuffer, i2, i2 + i3);
        doWindow(this.dataRO.streamId(), this.dataRO.payload().sizeof());
    }

    private void doWindow(long j, int i) {
        WindowFW build = this.windowRW.wrap(this.throttleBuffer, 0, this.throttleBuffer.capacity()).streamId(j).update(i).build();
        this.sourceOutputEstStreams.writeThrottle(build.typeId(), build.buffer(), build.offset(), build.sizeof());
    }

    private Flyweight.Builder.Visitor visitHttpBeginEx(Consumer<ListFW.Builder<HttpHeaderFW.Builder, HttpHeaderFW>> consumer) {
        return (mutableDirectBuffer, i, i2) -> {
            return this.httpBeginExRW.wrap(mutableDirectBuffer, i, i2).headers(consumer).build().sizeof();
        };
    }

    public static void main(String[] strArr) throws RunnerException {
        new Runner(new OptionsBuilder().include(HttpPushServerBM.class.getSimpleName()).forks(0).build()).run();
    }
}
