package org.reaktivity.nukleus.http_cache.internal.stream;

import java.util.Objects;
import org.agrona.DirectBuffer;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.http_cache.internal.HttpCacheConfiguration;
import org.reaktivity.nukleus.http_cache.internal.proxy.cache.CacheUtils;
import org.reaktivity.nukleus.http_cache.internal.proxy.cache.SurrogateControl;
import org.reaktivity.nukleus.http_cache.internal.proxy.request.CacheRefreshRequest;
import org.reaktivity.nukleus.http_cache.internal.proxy.request.CacheableRequest;
import org.reaktivity.nukleus.http_cache.internal.proxy.request.Request;
import org.reaktivity.nukleus.http_cache.internal.stream.BudgetManager;
import org.reaktivity.nukleus.http_cache.internal.stream.util.HttpHeaders;
import org.reaktivity.nukleus.http_cache.internal.stream.util.HttpHeadersUtil;
import org.reaktivity.nukleus.http_cache.internal.types.HttpHeaderFW;
import org.reaktivity.nukleus.http_cache.internal.types.ListFW;
import org.reaktivity.nukleus.http_cache.internal.types.OctetsFW;
import org.reaktivity.nukleus.http_cache.internal.types.stream.AbortFW;
import org.reaktivity.nukleus.http_cache.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.http_cache.internal.types.stream.DataFW;
import org.reaktivity.nukleus.http_cache.internal.types.stream.EndFW;
import org.reaktivity.nukleus.http_cache.internal.types.stream.HttpBeginExFW;
import org.reaktivity.nukleus.http_cache.internal.types.stream.ResetFW;
import org.reaktivity.nukleus.http_cache.internal.types.stream.WindowFW;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/reaktivity/nukleus/http_cache/internal/stream/ProxyConnectReplyStream.class */
public final class ProxyConnectReplyStream {
    private final ProxyStreamFactory streamFactory;
    private MessageConsumer streamState = this::beforeBegin;
    private final MessageConsumer connectReplyThrottle;
    private final long connectRouteId;
    private final long connectReplyStreamId;
    private Request streamCorrelation;
    private int acceptReplyBudget;
    private int connectReplyBudget;
    private long groupId;
    private int padding;
    private boolean endDeferred;
    private boolean cached;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ProxyConnectReplyStream(ProxyStreamFactory proxyStreamFactory, MessageConsumer messageConsumer, long j, long j2) {
        this.streamFactory = proxyStreamFactory;
        this.connectReplyThrottle = messageConsumer;
        this.connectRouteId = j;
        this.connectReplyStreamId = j2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleStream(int i, DirectBuffer directBuffer, int i2, int i3) {
        this.streamState.accept(i, directBuffer, i2, i3);
    }

    private void beforeBegin(int i, DirectBuffer directBuffer, int i2, int i3) {
        if (i == 1) {
            handleBegin(this.streamFactory.beginRO.wrap(directBuffer, i2, i2 + i3));
        } else {
            this.streamFactory.writer.doReset(this.connectReplyThrottle, this.connectRouteId, this.connectReplyStreamId, this.streamFactory.supplyTrace.getAsLong());
        }
    }

    private void handleBegin(BeginFW beginFW) {
        long streamId = beginFW.streamId();
        this.streamCorrelation = (Request) this.streamFactory.correlations.remove(streamId);
        OctetsFW extension = this.streamFactory.beginRO.extension();
        if (this.streamCorrelation == null || extension.sizeof() <= 0) {
            this.streamFactory.writer.doReset(this.connectReplyThrottle, this.connectRouteId, this.connectReplyStreamId, 0L);
            return;
        }
        HttpBeginExFW httpBeginExFW = this.streamFactory.httpBeginExRO;
        Objects.requireNonNull(httpBeginExFW);
        HttpBeginExFW httpBeginExFW2 = (HttpBeginExFW) extension.get(httpBeginExFW::wrap);
        if (HttpCacheConfiguration.DEBUG) {
            System.out.printf("[%016x] CONNECT %016x %s [received response]\n", Long.valueOf(System.currentTimeMillis()), Long.valueOf(streamId), HttpHeadersUtil.getHeader(httpBeginExFW2.headers(), HttpHeaders.STATUS));
        }
        ListFW<HttpHeaderFW> headers = httpBeginExFW2.headers();
        switch (this.streamCorrelation.getType()) {
            case PROXY:
                doProxyBegin(headers);
                return;
            case INITIAL_REQUEST:
                handleInitialRequest(headers);
                return;
            case CACHE_REFRESH:
                handleCacheRefresh(headers);
                return;
            default:
                throw new RuntimeException("Not implemented");
        }
    }

    private void handleCacheRefresh(ListFW<HttpHeaderFW> listFW) {
        if (HttpHeadersUtil.retry(listFW) && ((CacheableRequest) this.streamCorrelation).attempts() < 3) {
            retryCacheableRequest();
            return;
        }
        CacheRefreshRequest cacheRefreshRequest = (CacheRefreshRequest) this.streamCorrelation;
        if (cacheRefreshRequest.storeResponseHeaders(listFW, this.streamFactory.cache, this.streamFactory.responseBufferPool)) {
            this.streamState = this::handleCacheRefresh;
            this.streamFactory.writer.doWindow(this.connectReplyThrottle, this.connectRouteId, this.connectReplyStreamId, 0L, 32767, 0, 0L);
        } else {
            cacheRefreshRequest.purge();
            this.streamState = this::reset;
            this.streamFactory.writer.doReset(this.connectReplyThrottle, this.connectRouteId, this.connectReplyStreamId, this.streamFactory.supplyTrace.getAsLong());
        }
    }

    private void reset(int i, DirectBuffer directBuffer, int i2, int i3) {
    }

    private void handleCacheRefresh(int i, DirectBuffer directBuffer, int i2, int i3) {
        CacheableRequest cacheableRequest = (CacheableRequest) this.streamCorrelation;
        switch (i) {
            case 2:
                if (cacheableRequest.storeResponseData(this.streamFactory.cache, this.streamFactory.dataRO.wrap(directBuffer, i2, i2 + i3), this.streamFactory.responseBufferPool)) {
                    this.streamFactory.writer.doWindow(this.connectReplyThrottle, this.connectRouteId, this.connectReplyStreamId, 0L, i3, 0, 0L);
                    return;
                }
                cacheableRequest.purge();
                this.streamState = this::reset;
                this.streamFactory.writer.doReset(this.connectReplyThrottle, this.connectRouteId, this.connectReplyStreamId, this.streamFactory.supplyTrace.getAsLong());
                return;
            case 3:
                cacheableRequest.cache(this.streamFactory.endRO.wrap(directBuffer, i2, i2 + i3), this.streamFactory.cache);
                return;
            case 4:
            default:
                cacheableRequest.purge();
                return;
        }
    }

    private void handleInitialRequest(ListFW<HttpHeaderFW> listFW) {
        if (HttpHeadersUtil.retry(listFW) && ((CacheableRequest) this.streamCorrelation).attempts() < 3) {
            retryCacheableRequest();
            return;
        }
        int surrogateFreshnessExtension = SurrogateControl.getSurrogateFreshnessExtension(listFW);
        boolean isCacheableResponse = CacheUtils.isCacheableResponse(listFW);
        if (surrogateFreshnessExtension > 0 && isCacheableResponse) {
            handleEdgeArchSync(listFW, surrogateFreshnessExtension);
        } else if (isCacheableResponse) {
            handleCacheableResponse(listFW);
        } else {
            this.streamCorrelation.purge();
            doProxyBegin(listFW);
        }
    }

    private void handleEdgeArchSync(ListFW<HttpHeaderFW> listFW, int i) {
        CacheableRequest cacheableRequest = (CacheableRequest) this.streamCorrelation;
        if (!cacheableRequest.storeResponseHeaders(listFW, this.streamFactory.cache, this.streamFactory.responseBufferPool)) {
            cacheableRequest.purge();
            doProxyBegin(listFW);
            return;
        }
        MessageConsumer acceptReply = this.streamCorrelation.acceptReply();
        long acceptRouteId = this.streamCorrelation.acceptRouteId();
        long acceptReplyId = this.streamCorrelation.acceptReplyId();
        if (HttpCacheConfiguration.DEBUG) {
            System.out.printf("[%016x] ACCEPT %016x %s [sent cacheable response]\n", Long.valueOf(System.currentTimeMillis()), Long.valueOf(acceptReplyId), HttpHeadersUtil.getHeader(listFW, HttpHeaders.STATUS));
        }
        this.streamCorrelation.setThrottle(this::onThrottleMessageWhenProxying);
        this.streamFactory.writer.doHttpResponseWithUpdatedCacheControl(acceptReply, acceptRouteId, acceptReplyId, this.streamFactory.cacheControlParser, listFW, i, cacheableRequest.etag(), false);
        this.streamFactory.counters.responses.getAsLong();
        this.streamFactory.writer.doHttpPushPromise(cacheableRequest, cacheableRequest, listFW, i, cacheableRequest.etag());
        this.streamFactory.counters.promises.getAsLong();
        this.streamState = this::handleCacheableRequestResponse;
    }

    private void retryCacheableRequest() {
        CacheableRequest cacheableRequest = (CacheableRequest) this.streamCorrelation;
        cacheableRequest.incAttempts();
        long applyAsLong = cacheableRequest.supplyInitialId().applyAsLong(this.connectRouteId);
        MessageConsumer supplyReceiver = this.streamFactory.router.supplyReceiver(applyAsLong);
        long applyAsLong2 = cacheableRequest.supplyReplyId().applyAsLong(applyAsLong);
        this.streamFactory.correlations.put(applyAsLong2, cacheableRequest);
        ListFW<HttpHeaderFW> requestHeaders = cacheableRequest.getRequestHeaders(this.streamFactory.requestHeadersRO);
        String etag = cacheableRequest.etag();
        if (HttpCacheConfiguration.DEBUG) {
            System.out.printf("[%016x] CONNECT %016x %s [retry cacheable request]\n", Long.valueOf(System.currentTimeMillis()), Long.valueOf(applyAsLong2), HttpHeadersUtil.getRequestURL(requestHeaders));
        }
        this.streamFactory.writer.doHttpRequest(supplyReceiver, this.connectRouteId, applyAsLong, builder -> {
            requestHeaders.forEach(httpHeaderFW -> {
                builder.item(builder -> {
                    builder.name(httpHeaderFW.name()).value(httpHeaderFW.value());
                });
            });
            if (cacheableRequest instanceof CacheRefreshRequest) {
                builder.item(builder -> {
                    builder.name(HttpHeaders.IF_NONE_MATCH).value(etag);
                });
            }
        });
        this.streamFactory.writer.doHttpEnd(supplyReceiver, this.connectRouteId, applyAsLong, this.streamFactory.supplyTrace.getAsLong());
        this.streamFactory.counters.requestsRetry.getAsLong();
    }

    private void handleCacheableResponse(ListFW<HttpHeaderFW> listFW) {
        CacheableRequest cacheableRequest = (CacheableRequest) this.streamCorrelation;
        if (!cacheableRequest.storeResponseHeaders(listFW, this.streamFactory.cache, this.streamFactory.responseBufferPool)) {
            cacheableRequest.purge();
        }
        doProxyBegin(listFW);
        this.streamState = this::handleCacheableRequestResponse;
    }

    private void handleCacheableRequestResponse(int i, DirectBuffer directBuffer, int i2, int i3) {
        CacheableRequest cacheableRequest = (CacheableRequest) this.streamCorrelation;
        switch (i) {
            case 2:
                if (!cacheableRequest.storeResponseData(this.streamFactory.cache, this.streamFactory.dataRO.wrap(directBuffer, i2, i2 + i3), this.streamFactory.responseBufferPool)) {
                    cacheableRequest.purge();
                    break;
                }
                break;
            case 3:
                this.cached = cacheableRequest.cache(this.streamFactory.endRO.wrap(directBuffer, i2, i2 + i3), this.streamFactory.cache);
                break;
            case 4:
            default:
                cacheableRequest.purge();
                break;
        }
        onStreamMessageWhenProxying(i, directBuffer, i2, i3);
    }

    private void doProxyBegin(ListFW<HttpHeaderFW> listFW) {
        MessageConsumer acceptReply = this.streamCorrelation.acceptReply();
        long acceptRouteId = this.streamCorrelation.acceptRouteId();
        long acceptReplyId = this.streamCorrelation.acceptReplyId();
        if (HttpCacheConfiguration.DEBUG) {
            System.out.printf("[%016x] ACCEPT %016x %s [sent proxy response]\n", Long.valueOf(System.currentTimeMillis()), Long.valueOf(acceptReplyId), HttpHeadersUtil.getHeader(listFW, HttpHeaders.STATUS));
        }
        this.streamCorrelation.setThrottle(this::onThrottleMessageWhenProxying);
        this.streamFactory.writer.doHttpResponse(acceptReply, acceptRouteId, acceptReplyId, builder -> {
            listFW.forEach(httpHeaderFW -> {
                builder.item(builder -> {
                    builder.name(httpHeaderFW.name()).value(httpHeaderFW.value());
                });
            });
        });
        this.streamFactory.counters.responses.getAsLong();
        this.streamState = this::onStreamMessageWhenProxying;
    }

    private void onStreamMessageWhenProxying(int i, DirectBuffer directBuffer, int i2, int i3) {
        switch (i) {
            case 2:
                onDataWhenProxying(this.streamFactory.dataRO.wrap(directBuffer, i2, i2 + i3));
                return;
            case 3:
                onEndWhenProxying(this.streamFactory.endRO.wrap(directBuffer, i2, i2 + i3));
                return;
            case 4:
                onAbortWhenProxying(this.streamFactory.abortRO.wrap(directBuffer, i2, i2 + i3));
                return;
            default:
                this.streamFactory.writer.doReset(this.connectReplyThrottle, this.connectRouteId, this.connectReplyStreamId, this.streamFactory.supplyTrace.getAsLong());
                return;
        }
    }

    private void onThrottleMessageWhenProxying(int i, DirectBuffer directBuffer, int i2, int i3) {
        switch (i) {
            case 1073741825:
                onResetWhenProxying(this.streamFactory.resetRO.wrap(directBuffer, i2, i2 + i3));
                return;
            case 1073741826:
                onWindowWhenProxying(this.streamFactory.windowRO.wrap(directBuffer, i2, i2 + i3));
                return;
            default:
                return;
        }
    }

    private void onDataWhenProxying(DataFW dataFW) {
        MessageConsumer acceptReply = this.streamCorrelation.acceptReply();
        long acceptRouteId = this.streamCorrelation.acceptRouteId();
        long acceptReplyId = this.streamCorrelation.acceptReplyId();
        this.connectReplyBudget -= dataFW.length() + dataFW.padding();
        if (this.connectReplyBudget < 0) {
            this.streamFactory.writer.doReset(this.connectReplyThrottle, this.connectRouteId, this.connectReplyStreamId, this.streamFactory.supplyTrace.getAsLong());
            return;
        }
        OctetsFW payload = dataFW.payload();
        this.acceptReplyBudget -= payload.sizeof() + dataFW.padding();
        if (!$assertionsDisabled && this.acceptReplyBudget < 0) {
            throw new AssertionError();
        }
        this.streamFactory.writer.doHttpData(acceptReply, acceptRouteId, acceptReplyId, dataFW.groupId(), dataFW.padding(), payload.buffer(), payload.offset(), payload.sizeof());
    }

    private void onEndWhenProxying(EndFW endFW) {
        MessageConsumer acceptReply = this.streamCorrelation.acceptReply();
        long acceptRouteId = this.streamCorrelation.acceptRouteId();
        long acceptReplyId = this.streamCorrelation.acceptReplyId();
        this.streamFactory.budgetManager.closing(this.groupId, acceptReplyId, this.connectReplyBudget);
        if (this.streamFactory.budgetManager.hasUnackedBudget(this.groupId, acceptReplyId)) {
            this.endDeferred = true;
            return;
        }
        long trace = endFW.trace();
        this.streamFactory.budgetManager.closed(BudgetManager.StreamKind.PROXY, this.groupId, acceptReplyId);
        this.streamFactory.writer.doHttpEnd(acceptReply, acceptRouteId, acceptReplyId, trace);
    }

    private void onAbortWhenProxying(AbortFW abortFW) {
        long trace = abortFW.trace();
        MessageConsumer acceptReply = this.streamCorrelation.acceptReply();
        long acceptRouteId = this.streamCorrelation.acceptRouteId();
        long acceptReplyId = this.streamCorrelation.acceptReplyId();
        this.streamFactory.budgetManager.closed(BudgetManager.StreamKind.PROXY, this.groupId, acceptReplyId);
        this.streamFactory.writer.doAbort(acceptReply, acceptRouteId, acceptReplyId, trace);
    }

    private void onWindowWhenProxying(WindowFW windowFW) {
        long streamId = windowFW.streamId();
        int credit = windowFW.credit();
        this.acceptReplyBudget += credit;
        this.padding = windowFW.padding();
        this.groupId = windowFW.groupId();
        this.streamFactory.budgetManager.window(BudgetManager.StreamKind.PROXY, this.groupId, streamId, credit, this::budgetAvailableWhenProxying);
        if (!this.endDeferred || this.streamFactory.budgetManager.hasUnackedBudget(this.groupId, streamId)) {
            return;
        }
        long acceptRouteId = this.streamCorrelation.acceptRouteId();
        long acceptReplyId = this.streamCorrelation.acceptReplyId();
        MessageConsumer acceptReply = this.streamCorrelation.acceptReply();
        this.streamFactory.budgetManager.closed(BudgetManager.StreamKind.PROXY, this.groupId, acceptReplyId);
        this.streamFactory.writer.doHttpEnd(acceptReply, acceptRouteId, acceptReplyId, 0L);
    }

    private void onResetWhenProxying(ResetFW resetFW) {
        this.streamFactory.budgetManager.closed(BudgetManager.StreamKind.PROXY, this.groupId, this.streamCorrelation.acceptReplyId());
        this.streamFactory.writer.doReset(this.connectReplyThrottle, this.connectRouteId, this.connectReplyStreamId, this.streamFactory.supplyTrace.getAsLong());
        if (this.cached) {
            return;
        }
        this.streamCorrelation.purge();
    }

    private int budgetAvailableWhenProxying(int i) {
        if (this.endDeferred) {
            return i;
        }
        this.connectReplyBudget += i;
        this.streamFactory.writer.doWindow(this.connectReplyThrottle, this.connectRouteId, this.connectReplyStreamId, 0L, i, this.padding, this.groupId);
        return 0;
    }

    static {
        $assertionsDisabled = !ProxyConnectReplyStream.class.desiredAssertionStatus();
    }
}
