package org.reaktivity.nukleus.http.internal.routable.stream;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.UnsafeBuffer;
import org.reaktivity.nukleus.http.internal.routable.Correlation;
import org.reaktivity.nukleus.http.internal.routable.Route;
import org.reaktivity.nukleus.http.internal.routable.Source;
import org.reaktivity.nukleus.http.internal.routable.Target;
import org.reaktivity.nukleus.http.internal.router.RouteKind;
import org.reaktivity.nukleus.http.internal.types.OctetsFW;
import org.reaktivity.nukleus.http.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.http.internal.types.stream.DataFW;
import org.reaktivity.nukleus.http.internal.types.stream.EndFW;
import org.reaktivity.nukleus.http.internal.types.stream.FrameFW;
import org.reaktivity.nukleus.http.internal.types.stream.ResetFW;
import org.reaktivity.nukleus.http.internal.types.stream.WindowFW;
import org.reaktivity.nukleus.http.internal.util.BufferUtil;
import org.reaktivity.nukleus.http.internal.util.function.LongObjectBiConsumer;

/* loaded from: input_file:org/reaktivity/nukleus/http/internal/routable/stream/SourceInputStreamFactory.class */
public final class SourceInputStreamFactory {
    private static final byte[] CRLFCRLF_BYTES = "\r\n\r\n".getBytes(StandardCharsets.US_ASCII);
    private final FrameFW frameRO = new FrameFW();
    private final BeginFW beginRO = new BeginFW();
    private final DataFW dataRO = new DataFW();
    private final EndFW endRO = new EndFW();
    private final WindowFW windowRO = new WindowFW();
    private final ResetFW resetRO = new ResetFW();
    private final Source source;
    private final LongFunction<List<Route>> supplyRoutes;
    private final LongSupplier supplyStreamId;
    private final Target rejectTarget;
    private final LongObjectBiConsumer<Correlation> correlateNew;

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:org/reaktivity/nukleus/http/internal/routable/stream/SourceInputStreamFactory$DecoderState.class */
    public interface DecoderState {
        int decode(DirectBuffer directBuffer, int i, int i2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/nukleus/http/internal/routable/stream/SourceInputStreamFactory$SourceInputStream.class */
    public final class SourceInputStream {
        private MessageHandler streamState;
        private MessageHandler throttleState;
        private DecoderState decoderState;
        private long sourceId;
        private Target target;
        private long targetId;
        private long sourceRef;
        private long correlationId;
        private int window;
        private int contentRemaining;
        private int sourceUpdateDeferred;

        public String toString() {
            return String.format("%s[source=%s, sourceId=%016x, window=%d, targetId=%016x]", getClass().getSimpleName(), SourceInputStreamFactory.this.source.routableName(), Long.valueOf(this.sourceId), Integer.valueOf(this.window), Long.valueOf(this.targetId));
        }

        private SourceInputStream() {
            this.streamState = (v1, v2, v3, v4) -> {
                streamBeforeBegin(v1, v2, v3, v4);
            };
            this.throttleState = (v1, v2, v3, v4) -> {
                throttleSkipNextWindow(v1, v2, v3, v4);
            };
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void handleStream(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
            this.streamState.onMessage(i, mutableDirectBuffer, i2, i3);
        }

        private void streamBeforeBegin(int i, DirectBuffer directBuffer, int i2, int i3) {
            if (i == 1) {
                processBegin(directBuffer, i2, i3);
            } else {
                processUnexpected(directBuffer, i2, i3);
            }
        }

        private void streamAfterBeginOrData(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 2:
                    processData(directBuffer, i2, i3);
                    return;
                case EndFW.TYPE_ID /* 3 */:
                    processEnd(directBuffer, i2, i3);
                    return;
                default:
                    processUnexpected(directBuffer, i2, i3);
                    return;
            }
        }

        private void streamAfterEnd(int i, DirectBuffer directBuffer, int i2, int i3) {
            processUnexpected(directBuffer, i2, i3);
        }

        private void streamAfterReplyOrReset(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
            if (i == 2) {
                SourceInputStreamFactory.this.dataRO.wrap((DirectBuffer) mutableDirectBuffer, i2, i2 + i3);
                SourceInputStreamFactory.this.source.doWindow(SourceInputStreamFactory.this.dataRO.streamId(), i3);
            } else if (i == 3) {
                SourceInputStreamFactory.this.endRO.wrap((DirectBuffer) mutableDirectBuffer, i2, i2 + i3);
                SourceInputStreamFactory.this.source.removeStream(SourceInputStreamFactory.this.endRO.streamId());
                this.streamState = (v1, v2, v3, v4) -> {
                    streamAfterEnd(v1, v2, v3, v4);
                };
            }
        }

        private void processUnexpected(DirectBuffer directBuffer, int i, int i2) {
            SourceInputStreamFactory.this.frameRO.wrap(directBuffer, i, i + i2);
            processUnexpected(SourceInputStreamFactory.this.frameRO.streamId());
        }

        private void processUnexpected(long j) {
            SourceInputStreamFactory.this.source.doReset(j);
            this.streamState = this::streamAfterReplyOrReset;
        }

        private void processInvalidRequest(int i, String str) {
            this.target = SourceInputStreamFactory.this.rejectTarget;
            long asLong = SourceInputStreamFactory.this.supplyStreamId.getAsLong();
            this.target.doBegin(asLong, 0L, this.correlationId);
            this.target.addThrottle(asLong, this::handleThrottle);
            DirectBuffer unsafeBuffer = new UnsafeBuffer(str.getBytes(StandardCharsets.UTF_8));
            this.target.doData(asLong, unsafeBuffer, 0, unsafeBuffer.capacity());
            this.decoderState = this::decodeHttpBegin;
            this.streamState = this::streamAfterReplyOrReset;
            this.throttleState = (v1, v2, v3, v4) -> {
                throttleSkipNextWindow(v1, v2, v3, v4);
            };
            this.sourceUpdateDeferred = i - unsafeBuffer.capacity();
        }

        private void processBegin(DirectBuffer directBuffer, int i, int i2) {
            SourceInputStreamFactory.this.beginRO.wrap(directBuffer, i, i + i2);
            this.sourceId = SourceInputStreamFactory.this.beginRO.streamId();
            this.sourceRef = SourceInputStreamFactory.this.beginRO.referenceId();
            this.correlationId = SourceInputStreamFactory.this.beginRO.correlationId();
            this.streamState = (v1, v2, v3, v4) -> {
                streamAfterBeginOrData(v1, v2, v3, v4);
            };
            this.decoderState = this::decodeHttpBegin;
            this.window += 512;
            SourceInputStreamFactory.this.source.doWindow(this.sourceId, 512);
        }

        private void processData(DirectBuffer directBuffer, int i, int i2) {
            SourceInputStreamFactory.this.dataRO.wrap(directBuffer, i, i + i2);
            OctetsFW payload = SourceInputStreamFactory.this.dataRO.payload();
            int offset = payload.offset() + 1;
            this.window -= payload.length() - 1;
            if (this.window < 0) {
                processUnexpected(directBuffer, i, i2);
                return;
            }
            int limit = payload.limit();
            while (offset < limit) {
                offset = this.decoderState.decode(directBuffer, offset, limit);
            }
        }

        private void processEnd(DirectBuffer directBuffer, int i, int i2) {
            SourceInputStreamFactory.this.endRO.wrap(directBuffer, i, i + i2);
            long streamId = SourceInputStreamFactory.this.endRO.streamId();
            this.decoderState = (directBuffer2, i3, i4) -> {
                return i3;
            };
            SourceInputStreamFactory.this.source.removeStream(streamId);
            this.target.removeThrottle(this.targetId);
        }

        private int decodeHttpBegin(DirectBuffer directBuffer, int i, int i2) {
            int limitOfBytes = BufferUtil.limitOfBytes(directBuffer, i, i2, SourceInputStreamFactory.CRLFCRLF_BYTES);
            if (limitOfBytes == -1) {
                throw new IllegalStateException("incomplete http headers");
            }
            String[] split = directBuffer.getStringWithoutLengthUtf8(i, limitOfBytes - i).split("\r\n");
            String[] split2 = split[0].split("\\s+");
            if (Pattern.compile("HTTP/1\\.(\\d)").matcher(split2[2]).matches()) {
                URI create = URI.create(split2[1]);
                Map<String, String> decodeHttpHeaders = decodeHttpHeaders(split2, split, create);
                if (decodeHttpHeaders.get(":authority") == null || create.getUserInfo() != null) {
                    processInvalidRequest(limitOfBytes - i, "HTTP/1.1 400 Bad Request\r\n\r\n");
                } else {
                    Optional<Route> resolveTarget = resolveTarget(this.sourceRef, decodeHttpHeaders);
                    if (resolveTarget.isPresent()) {
                        long asLong = SourceInputStreamFactory.this.supplyStreamId.getAsLong();
                        SourceInputStreamFactory.this.correlateNew.accept(asLong, (long) new Correlation(this.correlationId, SourceInputStreamFactory.this.source.routableName(), RouteKind.OUTPUT_ESTABLISHED));
                        Route route = resolveTarget.get();
                        Target target = route.target();
                        target.doHttpBegin(asLong, route.targetRef(), asLong, builder -> {
                            decodeHttpHeaders.forEach((str, str2) -> {
                                builder.item(builder -> {
                                    builder.name(str).value(str2);
                                });
                            });
                        });
                        target.addThrottle(asLong, this::handleThrottle);
                        boolean containsKey = decodeHttpHeaders.containsKey("upgrade");
                        if (containsKey) {
                            this.decoderState = this::decodeHttpDataAfterUpgrade;
                        } else {
                            this.contentRemaining = Integer.parseInt(decodeHttpHeaders.getOrDefault("content-length", "0"));
                            this.decoderState = this::decodeHttpData;
                        }
                        if (containsKey || this.contentRemaining != 0) {
                            this.target = target;
                            this.targetId = asLong;
                            this.throttleState = (v1, v2, v3, v4) -> {
                                throttleNextWindow(v1, v2, v3, v4);
                            };
                            this.sourceUpdateDeferred = limitOfBytes - i;
                        } else {
                            target.doHttpEnd(asLong);
                            SourceInputStreamFactory.this.source.doWindow(this.sourceId, i2 - i);
                            this.target = target;
                            this.targetId = asLong;
                            this.throttleState = (v1, v2, v3, v4) -> {
                                throttleSkipNextWindow(v1, v2, v3, v4);
                            };
                        }
                    } else {
                        processInvalidRequest(limitOfBytes - i, "HTTP/1.1 404 Not Found\r\n\r\n");
                    }
                }
            } else {
                processInvalidRequest(limitOfBytes - i, "HTTP/1.1 505 HTTP Version Not Supported\r\n\r\n");
            }
            return limitOfBytes;
        }

        private Map<String, String> decodeHttpHeaders(String[] strArr, String[] strArr2, URI uri) {
            String authority = uri.getAuthority();
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            linkedHashMap.put(":scheme", "http");
            linkedHashMap.put(":method", strArr[0]);
            linkedHashMap.put(":path", uri.getPath());
            if (authority != null) {
                linkedHashMap.put(":authority", authority);
            }
            Pattern compile = Pattern.compile("([^\\s:]+)\\s*:\\s*(.*)");
            for (int i = 1; i < strArr2.length; i++) {
                Matcher matcher = compile.matcher(strArr2[i]);
                if (!matcher.matches()) {
                    throw new IllegalStateException("illegal http header syntax");
                }
                String lowerCase = matcher.group(1).toLowerCase();
                String group = matcher.group(2);
                if (!"host".equals(lowerCase)) {
                    linkedHashMap.put(lowerCase, group);
                } else if (authority == null) {
                    linkedHashMap.put(":authority", group);
                }
            }
            return linkedHashMap;
        }

        private int decodeHttpData(DirectBuffer directBuffer, int i, int i2) {
            int min = Math.min(i2 - i, this.contentRemaining);
            this.target.doHttpData(this.targetId, directBuffer, i, min);
            this.contentRemaining -= min;
            if (this.contentRemaining == 0) {
                this.target.doHttpEnd(this.targetId);
                if (this.sourceUpdateDeferred != 0) {
                    SourceInputStreamFactory.this.source.doWindow(this.sourceId, this.sourceUpdateDeferred);
                    this.sourceUpdateDeferred = 0;
                }
                this.throttleState = (v1, v2, v3, v4) -> {
                    throttleSkipNextWindow(v1, v2, v3, v4);
                };
            }
            return i + min;
        }

        private int decodeHttpDataAfterUpgrade(DirectBuffer directBuffer, int i, int i2) {
            this.target.doData(this.targetId, directBuffer, i, i2 - i);
            return i2;
        }

        private int decodeHttpEnd(DirectBuffer directBuffer, int i, int i2) {
            this.target.doHttpEnd(this.targetId);
            return i2;
        }

        private Optional<Route> resolveTarget(long j, Map<String, String> map) {
            List list = (List) SourceInputStreamFactory.this.supplyRoutes.apply(j);
            return list.stream().filter(Route.headersMatch(map)).findFirst();
        }

        private void handleThrottle(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
            this.throttleState.onMessage(i, mutableDirectBuffer, i2, i3);
        }

        private void throttleSkipNextWindow(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 1073741825:
                    processReset(directBuffer, i2, i3);
                    return;
                case 1073741826:
                    processSkipNextWindow(directBuffer, i2, i3);
                    return;
                default:
                    return;
            }
        }

        private void throttleNextWindow(int i, DirectBuffer directBuffer, int i2, int i3) {
            switch (i) {
                case 1073741825:
                    processReset(directBuffer, i2, i3);
                    return;
                case 1073741826:
                    processNextWindow(directBuffer, i2, i3);
                    return;
                default:
                    return;
            }
        }

        private void processSkipNextWindow(DirectBuffer directBuffer, int i, int i2) {
            SourceInputStreamFactory.this.windowRO.wrap(directBuffer, i, i + i2);
            this.throttleState = (v1, v2, v3, v4) -> {
                throttleNextWindow(v1, v2, v3, v4);
            };
        }

        private void processNextWindow(DirectBuffer directBuffer, int i, int i2) {
            SourceInputStreamFactory.this.windowRO.wrap(directBuffer, i, i + i2);
            int update = SourceInputStreamFactory.this.windowRO.update();
            if (this.sourceUpdateDeferred != 0) {
                update += this.sourceUpdateDeferred;
                this.sourceUpdateDeferred = 0;
            }
            this.window += update;
            SourceInputStreamFactory.this.source.doWindow(this.sourceId, update + SourceInputStreamFactory.framing(update));
        }

        private void processReset(DirectBuffer directBuffer, int i, int i2) {
            SourceInputStreamFactory.this.resetRO.wrap(directBuffer, i, i + i2);
            SourceInputStreamFactory.this.source.doReset(this.sourceId);
        }
    }

    public SourceInputStreamFactory(Source source, LongFunction<List<Route>> longFunction, LongSupplier longSupplier, Target target, LongObjectBiConsumer<Correlation> longObjectBiConsumer) {
        this.source = source;
        this.supplyRoutes = longFunction;
        this.supplyStreamId = longSupplier;
        this.rejectTarget = target;
        this.correlateNew = longObjectBiConsumer;
    }

    public MessageHandler newStream() {
        SourceInputStream sourceInputStream = new SourceInputStream();
        sourceInputStream.getClass();
        return (i, mutableDirectBuffer, i2, i3) -> {
            sourceInputStream.handleStream(i, mutableDirectBuffer, i2, i3);
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static int framing(int i) {
        return 0;
    }
}
