package org.reaktivity.nukleus.http_cache.internal.stream;

import java.util.Objects;
import java.util.function.LongSupplier;
import java.util.function.LongUnaryOperator;
import java.util.function.ToIntFunction;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.http_cache.internal.proxy.cache.HttpStatus;
import org.reaktivity.nukleus.http_cache.internal.stream.util.HttpHeaders;
import org.reaktivity.nukleus.http_cache.internal.stream.util.Writer;
import org.reaktivity.nukleus.http_cache.internal.types.control.RouteFW;
import org.reaktivity.nukleus.http_cache.internal.types.stream.AbortFW;
import org.reaktivity.nukleus.http_cache.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.http_cache.internal.types.stream.DataFW;
import org.reaktivity.nukleus.http_cache.internal.types.stream.EndFW;
import org.reaktivity.nukleus.http_cache.internal.types.stream.ResetFW;
import org.reaktivity.nukleus.http_cache.internal.types.stream.WindowFW;
import org.reaktivity.nukleus.route.RouteManager;
import org.reaktivity.nukleus.stream.StreamFactory;

/* loaded from: input_file:org/reaktivity/nukleus/http_cache/internal/stream/ServerStreamFactory.class */
public class ServerStreamFactory implements StreamFactory {
    private final RouteFW routeRO = new RouteFW();
    private final BeginFW beginRO = new BeginFW();
    private final DataFW dataRO = new DataFW();
    private final EndFW endRO = new EndFW();
    private final AbortFW abortRO = new AbortFW();
    private final WindowFW windowRO = new WindowFW();
    private final ResetFW resetRO = new ResetFW();
    private final RouteManager router;
    private final LongUnaryOperator supplyReplyId;
    private final LongSupplier supplyTrace;
    private final Writer writer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/http_cache/internal/stream/ServerStreamFactory$ServerAcceptStream.class */
    public final class ServerAcceptStream {
        private final MessageConsumer acceptReply;
        private final long acceptRouteId;
        private final long acceptInitialId;
        private final LongSupplier supplyTrace;
        private MessageConsumer streamState;
        private long acceptReplyId;

        private ServerAcceptStream(MessageConsumer messageConsumer, long j, long j2, LongSupplier longSupplier) {
            this.acceptReply = messageConsumer;
            this.acceptRouteId = j;
            this.acceptInitialId = j2;
            this.supplyTrace = longSupplier;
            this.streamState = this::beforeBegin;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void onStreamMessage(int i, DirectBuffer directBuffer, int i2, int i3) {
            this.streamState.accept(i, directBuffer, i2, i3);
        }

        private void beforeBegin(int i, DirectBuffer directBuffer, int i2, int i3) {
            if (i == 1) {
                onBegin(ServerStreamFactory.this.beginRO.wrap(directBuffer, i2, i2 + i3));
            } else {
                ServerStreamFactory.this.writer.doReset(this.acceptReply, this.acceptRouteId, this.acceptInitialId, this.supplyTrace.getAsLong());
            }
        }

        private void afterBegin(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 2:
                    onData(ServerStreamFactory.this.dataRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 3:
                    onEnd(ServerStreamFactory.this.endRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 4:
                    onAbort(ServerStreamFactory.this.abortRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                default:
                    ServerStreamFactory.this.writer.doReset(this.acceptReply, this.acceptRouteId, this.acceptInitialId, this.supplyTrace.getAsLong());
                    return;
            }
        }

        private void onThrottleMessage(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 1073741825:
                    onReset(ServerStreamFactory.this.resetRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                case 1073741826:
                    onWindow(ServerStreamFactory.this.windowRO.wrap(directBuffer, i2, i2 + i3));
                    return;
                default:
                    return;
            }
        }

        private void onBegin(BeginFW beginFW) {
            this.acceptReplyId = ServerStreamFactory.this.supplyReplyId.applyAsLong(beginFW.streamId());
            ServerStreamFactory.this.writer.doWindow(this.acceptReply, this.acceptRouteId, beginFW.streamId(), this.supplyTrace.getAsLong(), 0, 0, 0L);
            ServerStreamFactory.this.writer.doHttpResponse(this.acceptReply, this.acceptRouteId, this.acceptReplyId, beginFW.trace(), builder -> {
                builder.item(builder -> {
                    builder.name(HttpHeaders.STATUS).value(HttpStatus.OK_200);
                });
                builder.item(builder2 -> {
                    builder2.name("content-type").value("text/event-stream");
                });
            });
            this.streamState = this::afterBegin;
            ServerStreamFactory.this.router.setThrottle(this.acceptReplyId, this::onThrottleMessage);
        }

        private void onData(DataFW dataFW) {
            ServerStreamFactory.this.writer.doReset(this.acceptReply, this.acceptRouteId, this.acceptInitialId, this.supplyTrace.getAsLong());
        }

        private void onEnd(EndFW endFW) {
        }

        private void onAbort(AbortFW abortFW) {
            ServerStreamFactory.this.writer.doAbort(this.acceptReply, this.acceptRouteId, this.acceptReplyId, abortFW.trace());
        }

        private void onWindow(WindowFW windowFW) {
        }

        private void onReset(ResetFW resetFW) {
            ServerStreamFactory.this.writer.doReset(this.acceptReply, this.acceptRouteId, this.acceptInitialId, resetFW.trace());
        }
    }

    public ServerStreamFactory(RouteManager routeManager, MutableDirectBuffer mutableDirectBuffer, LongUnaryOperator longUnaryOperator, LongSupplier longSupplier, ToIntFunction<String> toIntFunction) {
        this.supplyTrace = (LongSupplier) Objects.requireNonNull(longSupplier);
        this.router = (RouteManager) Objects.requireNonNull(routeManager);
        this.supplyReplyId = (LongUnaryOperator) Objects.requireNonNull(longUnaryOperator);
        this.writer = new Writer(routeManager, toIntFunction, mutableDirectBuffer);
    }

    public MessageConsumer newStream(int i, DirectBuffer directBuffer, int i2, int i3, MessageConsumer messageConsumer) {
        BeginFW wrap = this.beginRO.wrap(directBuffer, i2, i2 + i3);
        MessageConsumer messageConsumer2 = null;
        if ((wrap.streamId() & 1) != 0) {
            messageConsumer2 = newAcceptStream(wrap, messageConsumer);
        }
        return messageConsumer2;
    }

    private MessageConsumer newAcceptStream(BeginFW beginFW, MessageConsumer messageConsumer) {
        long routeId = beginFW.routeId();
        MessageConsumer messageConsumer2 = null;
        if (((RouteFW) this.router.resolve(routeId, beginFW.authorization(), (i, directBuffer, i2, i3) -> {
            return true;
        }, this::wrapRoute)) != null) {
            ServerAcceptStream serverAcceptStream = new ServerAcceptStream(messageConsumer, routeId, beginFW.streamId(), this.supplyTrace);
            messageConsumer2 = (i4, directBuffer2, i5, i6) -> {
                serverAcceptStream.onStreamMessage(i4, directBuffer2, i5, i6);
            };
        }
        return messageConsumer2;
    }

    private RouteFW wrapRoute(int i, DirectBuffer directBuffer, int i2, int i3) {
        return this.routeRO.wrap(directBuffer, i2, i2 + i3);
    }
}
