package org.opensearch.migrations.replay;

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import lombok.NonNull;
import org.opensearch.migrations.replay.Accumulation;
import org.opensearch.migrations.replay.HttpMessageAndTimestamp;
import org.opensearch.migrations.replay.RequestResponsePacketPair;
import org.opensearch.migrations.replay.datatypes.ITrafficStreamKey;
import org.opensearch.migrations.replay.tracing.IReplayContexts;
import org.opensearch.migrations.replay.traffic.expiration.BehavioralPolicy;
import org.opensearch.migrations.replay.traffic.expiration.ExpiringTrafficStreamMap;
import org.opensearch.migrations.replay.traffic.source.ITrafficStreamWithKey;
import org.opensearch.migrations.trafficcapture.protos.TrafficObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
import org.opensearch.migrations.trafficcapture.protos.TrafficStreamUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.class */
public class CapturedTrafficToHttpTransactionAccumulator {
    private static final Logger log;
    public static final Duration EXPIRATION_GRANULARITY;
    private final ExpiringTrafficStreamMap liveStreams;
    private final SpanWrappingAccumulationCallbacks listener;
    private final AtomicInteger requestCounter = new AtomicInteger();
    private final AtomicInteger reusedKeepAliveCounter = new AtomicInteger();
    private final AtomicInteger closedConnectionCounter = new AtomicInteger();
    private final AtomicInteger exceptionConnectionCounter = new AtomicInteger();
    private final AtomicInteger connectionsExpiredCounter = new AtomicInteger();
    private final AtomicInteger requestsTerminatedUponAccumulatorCloseCounter = new AtomicInteger();
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator$CONNECTION_STATUS.class */
    public enum CONNECTION_STATUS {
        ALIVE,
        CLOSED
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator$SpanWrappingAccumulationCallbacks.class */
    public static class SpanWrappingAccumulationCallbacks {
        private final AccumulationCallbacks underlying;

        public Consumer<RequestResponsePacketPair> onRequestReceived(IReplayContexts.IRequestAccumulationContext iRequestAccumulationContext, @NonNull HttpMessageAndTimestamp httpMessageAndTimestamp) {
            if (httpMessageAndTimestamp == null) {
                throw new NullPointerException("request is marked non-null but is null");
            }
            iRequestAccumulationContext.close();
            Consumer<RequestResponsePacketPair> onRequestReceived = this.underlying.onRequestReceived((IReplayContexts.IReplayerHttpTransactionContext) iRequestAccumulationContext.getLogicalEnclosingScope(), httpMessageAndTimestamp);
            return requestResponsePacketPair -> {
                requestResponsePacketPair.getResponseContext().close();
                onRequestReceived.accept(requestResponsePacketPair);
            };
        }

        public void onConnectionClose(@NonNull Accumulation accumulation, RequestResponsePacketPair.ReconstructionStatus reconstructionStatus, @NonNull Instant instant, @NonNull List<ITrafficStreamKey> list) {
            if (accumulation == null) {
                throw new NullPointerException("accum is marked non-null but is null");
            }
            if (instant == null) {
                throw new NullPointerException("when is marked non-null but is null");
            }
            if (list == null) {
                throw new NullPointerException("trafficStreamKeysBeingHeld is marked non-null but is null");
            }
            this.underlying.onConnectionClose(accumulation.numberOfResets.get(), (IReplayContexts.IChannelKeyContext) accumulation.trafficChannelKey.getTrafficStreamsContext().getLogicalEnclosingScope(), accumulation.startingSourceRequestIndex, reconstructionStatus, instant, list);
        }

        public void onTrafficStreamsExpired(RequestResponsePacketPair.ReconstructionStatus reconstructionStatus, IReplayContexts.ITrafficStreamsLifecycleContext iTrafficStreamsLifecycleContext, @NonNull List<ITrafficStreamKey> list) {
            if (list == null) {
                throw new NullPointerException("trafficStreamKeysBeingHeld is marked non-null but is null");
            }
            this.underlying.onTrafficStreamsExpired(reconstructionStatus, (IReplayContexts.IChannelKeyContext) iTrafficStreamsLifecycleContext.getLogicalEnclosingScope(), list);
        }

        public void onTrafficStreamIgnored(@NonNull ITrafficStreamKey iTrafficStreamKey) {
            if (iTrafficStreamKey == null) {
                throw new NullPointerException("tsk is marked non-null but is null");
            }
            this.underlying.onTrafficStreamIgnored(iTrafficStreamKey.getTrafficStreamsContext());
        }

        public SpanWrappingAccumulationCallbacks(AccumulationCallbacks accumulationCallbacks) {
            this.underlying = accumulationCallbacks;
        }
    }

    public String getStatsString() {
        return new StringJoiner(" ").add("requests: " + this.requestCounter.get()).add("reused: " + this.reusedKeepAliveCounter.get()).add("closed: " + this.closedConnectionCounter.get()).add("expired: " + this.connectionsExpiredCounter.get()).add("hardClosedAtShutdown: " + this.requestsTerminatedUponAccumulatorCloseCounter.get()).toString();
    }

    public CapturedTrafficToHttpTransactionAccumulator(Duration duration, final String str, AccumulationCallbacks accumulationCallbacks) {
        this.liveStreams = new ExpiringTrafficStreamMap(duration, EXPIRATION_GRANULARITY, new BehavioralPolicy() { // from class: org.opensearch.migrations.replay.CapturedTrafficToHttpTransactionAccumulator.1
            @Override // org.opensearch.migrations.replay.traffic.expiration.BehavioralPolicy
            public String appendageToDescribeHowToSetMinimumGuaranteedLifetime() {
                return str;
            }

            @Override // org.opensearch.migrations.replay.traffic.expiration.BehavioralPolicy
            public void onExpireAccumulation(String str2, Accumulation accumulation) {
                CapturedTrafficToHttpTransactionAccumulator.this.connectionsExpiredCounter.incrementAndGet();
                CapturedTrafficToHttpTransactionAccumulator.log.atTrace().setMessage(() -> {
                    return "firing accumulation for accum=[" + accumulation.getRrPair().getBeginningTrafficStreamKey() + "]=" + accumulation;
                }).log();
                CapturedTrafficToHttpTransactionAccumulator.this.fireAccumulationsCallbacksAndClose(accumulation, RequestResponsePacketPair.ReconstructionStatus.EXPIRED_PREMATURELY);
            }
        });
        this.listener = new SpanWrappingAccumulationCallbacks(accumulationCallbacks);
    }

    public int numberOfConnectionsCreated() {
        return this.liveStreams.numberOfConnectionsCreated();
    }

    public int numberOfRequestsOnReusedConnections() {
        return this.reusedKeepAliveCounter.get();
    }

    public int numberOfConnectionsClosed() {
        return this.closedConnectionCounter.get();
    }

    public int numberOfConnectionExceptions() {
        return this.exceptionConnectionCounter.get();
    }

    public int numberOfConnectionsExpired() {
        return this.connectionsExpiredCounter.get();
    }

    public int numberOfRequestsTerminatedUponAccumulatorClose() {
        return this.requestsTerminatedUponAccumulatorCloseCounter.get();
    }

    private static String summarizeTrafficStream(TrafficStream trafficStream) {
        return "nodeId: " + trafficStream.getNodeId() + " connId: " + trafficStream.getConnectionId() + " index: " + TrafficStreamUtils.getTrafficStreamIndex(trafficStream) + " firstTimestamp: " + ((String) trafficStream.getSubStreamList().stream().findFirst().map(trafficObservation -> {
            return trafficObservation.getTs();
        }).map(TrafficStreamUtils::instantFromProtoTimestamp).map((v0) -> {
            return v0.toString();
        }).orElse("[None]"));
    }

    public void accept(ITrafficStreamWithKey iTrafficStreamWithKey) {
        TrafficStream stream = iTrafficStreamWithKey.getStream();
        log.atTrace().setMessage(() -> {
            return "Got trafficStream: " + summarizeTrafficStream(stream);
        }).log();
        String nodeId = stream.getNodeId();
        String connectionId = stream.getConnectionId();
        ITrafficStreamKey key = iTrafficStreamWithKey.getKey();
        Accumulation orCreateWithoutExpiration = this.liveStreams.getOrCreateWithoutExpiration(key, iTrafficStreamKey -> {
            return createInitialAccumulation(iTrafficStreamWithKey);
        });
        TrafficStream stream2 = iTrafficStreamWithKey.getStream();
        int i = 0;
        while (true) {
            if (i >= stream2.getSubStreamCount()) {
                break;
            }
            if (CONNECTION_STATUS.CLOSED == addObservationToAccumulation(orCreateWithoutExpiration, key, (TrafficObservation) stream2.getSubStreamList().get(i))) {
                log.atInfo().setMessage(() -> {
                    return "Connection terminated: removing " + nodeId + ":" + connectionId + " from liveStreams map";
                }).log();
                this.liveStreams.remove(nodeId, connectionId);
                break;
            }
            i++;
        }
        if (orCreateWithoutExpiration.hasRrPair()) {
            orCreateWithoutExpiration.getRrPair().holdTrafficStream(key);
            return;
        }
        if (stream2.getSubStream(stream2.getSubStreamCount() - 1).hasClose()) {
            return;
        }
        if (!$assertionsDisabled && orCreateWithoutExpiration.state != Accumulation.State.WAITING_FOR_NEXT_READ_CHUNK && orCreateWithoutExpiration.state != Accumulation.State.IGNORING_LAST_REQUEST && stream2.getSubStreamCount() != 0) {
            throw new AssertionError();
        }
        this.listener.onTrafficStreamIgnored(key);
    }

    private Accumulation createInitialAccumulation(ITrafficStreamWithKey iTrafficStreamWithKey) {
        TrafficStream stream = iTrafficStreamWithKey.getStream();
        ITrafficStreamKey key = iTrafficStreamWithKey.getKey();
        if (key.getTrafficStreamIndex() == 0 && (stream.getPriorRequestsReceived() > 0 || stream.getLastObservationWasUnterminatedRead())) {
            log.atWarn().setMessage(() -> {
                return "Encountered a TrafficStream object with inconsistent values between the prior request count (" + stream.getPriorRequestsReceived() + ", lastObservationWasUnterminatedRead (" + stream.getLastObservationWasUnterminatedRead() + ") and the index (" + key.getTrafficStreamIndex() + ").  Traffic Observations will be ignored until Reads after the next EndOfMessage are encountered.   Full stream object=" + stream;
            }).log();
        }
        return new Accumulation(iTrafficStreamWithKey.getKey(), stream);
    }

    public CONNECTION_STATUS addObservationToAccumulation(@NonNull Accumulation accumulation, @NonNull ITrafficStreamKey iTrafficStreamKey, TrafficObservation trafficObservation) {
        if (accumulation == null) {
            throw new NullPointerException("accum is marked non-null but is null");
        }
        if (iTrafficStreamKey == null) {
            throw new NullPointerException("trafficStreamKey is marked non-null but is null");
        }
        log.atTrace().setMessage("{}").addArgument(() -> {
            return "Adding observation: " + trafficObservation + " with state=" + accumulation.state;
        }).log();
        Instant instantFromProtoTimestamp = TrafficStreamUtils.instantFromProtoTimestamp(trafficObservation.getTs());
        this.liveStreams.expireOldEntries(iTrafficStreamKey, accumulation, instantFromProtoTimestamp);
        return handleCloseObservationThatAffectEveryState(accumulation, trafficObservation, iTrafficStreamKey, instantFromProtoTimestamp).or(() -> {
            return handleObservationForSkipState(accumulation, trafficObservation);
        }).or(() -> {
            return handleObservationForReadState(accumulation, trafficObservation, iTrafficStreamKey, instantFromProtoTimestamp);
        }).or(() -> {
            return handleObservationForWriteState(accumulation, trafficObservation, iTrafficStreamKey, instantFromProtoTimestamp);
        }).orElseGet(() -> {
            log.atWarn().setMessage(() -> {
                return "unaccounted for observation type " + trafficObservation + " for " + accumulation.trafficChannelKey;
            }).log();
            return CONNECTION_STATUS.ALIVE;
        });
    }

    private Optional<CONNECTION_STATUS> handleObservationForSkipState(Accumulation accumulation, TrafficObservation trafficObservation) {
        if (!$assertionsDisabled && trafficObservation.hasClose()) {
            throw new AssertionError("close will be handled earlier in handleCloseObservationThatAffectEveryState");
        }
        if (accumulation.state != Accumulation.State.IGNORING_LAST_REQUEST) {
            if (accumulation.state == Accumulation.State.WAITING_FOR_NEXT_READ_CHUNK) {
                if (!trafficObservation.hasRead() && !trafficObservation.hasReadSegment()) {
                    return Optional.of(CONNECTION_STATUS.ALIVE);
                }
                accumulation.state = Accumulation.State.ACCUMULATING_READS;
            }
            return Optional.empty();
        }
        if (trafficObservation.hasWrite() || trafficObservation.hasWriteSegment() || trafficObservation.hasEndOfMessageIndicator()) {
            accumulation.state = Accumulation.State.WAITING_FOR_NEXT_READ_CHUNK;
        } else if (trafficObservation.hasRequestDropped()) {
            handleDroppedRequestForAccumulation(accumulation);
        }
        return Optional.of(CONNECTION_STATUS.ALIVE);
    }

    private static List<ITrafficStreamKey> getTrafficStreamsHeldByAccum(Accumulation accumulation) {
        return accumulation.hasRrPair() ? accumulation.getRrPair().trafficStreamKeysBeingHeld : List.of();
    }

    private Optional<CONNECTION_STATUS> handleCloseObservationThatAffectEveryState(Accumulation accumulation, TrafficObservation trafficObservation, @NonNull ITrafficStreamKey iTrafficStreamKey, Instant instant) {
        if (iTrafficStreamKey == null) {
            throw new NullPointerException("trafficStreamKey is marked non-null but is null");
        }
        Instant instantFromProtoTimestamp = TrafficStreamUtils.instantFromProtoTimestamp(trafficObservation.getTs());
        if (trafficObservation.hasClose()) {
            accumulation.getOrCreateTransactionPair(iTrafficStreamKey, instantFromProtoTimestamp).holdTrafficStream(iTrafficStreamKey);
            List<ITrafficStreamKey> trafficStreamsHeldByAccum = getTrafficStreamsHeldByAccum(accumulation);
            if (rotateAccumulationIfNecessary(iTrafficStreamKey.getConnectionId(), accumulation)) {
                trafficStreamsHeldByAccum = List.of();
            }
            this.closedConnectionCounter.incrementAndGet();
            this.listener.onConnectionClose(accumulation, RequestResponsePacketPair.ReconstructionStatus.COMPLETE, instant, trafficStreamsHeldByAccum);
            return Optional.of(CONNECTION_STATUS.CLOSED);
        }
        if (!trafficObservation.hasConnectionException()) {
            return Optional.empty();
        }
        accumulation.getOrCreateTransactionPair(iTrafficStreamKey, instantFromProtoTimestamp).holdTrafficStream(iTrafficStreamKey);
        rotateAccumulationIfNecessary(iTrafficStreamKey.getConnectionId(), accumulation);
        this.exceptionConnectionCounter.incrementAndGet();
        accumulation.resetForNextRequest();
        log.atDebug().setMessage(() -> {
            return "Removing accumulated traffic pair due to recorded connection exception event for " + iTrafficStreamKey.getConnectionId();
        }).log();
        log.atTrace().setMessage(() -> {
            return "Accumulated object: " + accumulation;
        }).log();
        return Optional.of(CONNECTION_STATUS.ALIVE);
    }

    private Optional<CONNECTION_STATUS> handleObservationForReadState(@NonNull Accumulation accumulation, TrafficObservation trafficObservation, @NonNull ITrafficStreamKey iTrafficStreamKey, Instant instant) {
        if (accumulation == null) {
            throw new NullPointerException("accum is marked non-null but is null");
        }
        if (iTrafficStreamKey == null) {
            throw new NullPointerException("trafficStreamKey is marked non-null but is null");
        }
        if (accumulation.state != Accumulation.State.ACCUMULATING_READS) {
            return Optional.empty();
        }
        String connectionId = iTrafficStreamKey.getConnectionId();
        Instant instantFromProtoTimestamp = TrafficStreamUtils.instantFromProtoTimestamp(trafficObservation.getTs());
        if (trafficObservation.hasRead()) {
            if (!accumulation.hasRrPair()) {
                this.requestCounter.incrementAndGet();
            }
            RequestResponsePacketPair orCreateTransactionPair = accumulation.getOrCreateTransactionPair(iTrafficStreamKey, instantFromProtoTimestamp);
            log.atTrace().setMessage(() -> {
                return "Adding request data for accum[" + connectionId + "]=" + accumulation;
            }).log();
            orCreateTransactionPair.addRequestData(instant, trafficObservation.getRead().getData().toByteArray());
            log.atTrace().setMessage(() -> {
                return "Added request data for accum[" + connectionId + "]=" + accumulation;
            }).log();
        } else if (trafficObservation.hasEndOfMessageIndicator()) {
            if (!$assertionsDisabled && !accumulation.hasRrPair()) {
                throw new AssertionError();
            }
            handleEndOfRequest(accumulation);
        } else if (trafficObservation.hasReadSegment()) {
            log.atTrace().setMessage(() -> {
                return "Adding request segment for accum[" + connectionId + "]=" + accumulation;
            }).log();
            RequestResponsePacketPair orCreateTransactionPair2 = accumulation.getOrCreateTransactionPair(iTrafficStreamKey, instantFromProtoTimestamp);
            if (orCreateTransactionPair2.requestData == null) {
                orCreateTransactionPair2.requestData = new HttpMessageAndTimestamp.Request(instant);
                this.requestCounter.incrementAndGet();
            }
            orCreateTransactionPair2.addRequestData(instant, trafficObservation.getRead().getData().toByteArray());
            orCreateTransactionPair2.requestData.addSegment(trafficObservation.getReadSegment().getData().toByteArray());
            log.atTrace().setMessage(() -> {
                return "Added request segment for accum[" + connectionId + "]=" + accumulation;
            }).log();
        } else if (trafficObservation.hasSegmentEnd()) {
            RequestResponsePacketPair rrPair = accumulation.getRrPair();
            if (!$assertionsDisabled && !rrPair.requestData.hasInProgressSegment()) {
                throw new AssertionError();
            }
            rrPair.requestData.finalizeRequestSegments(instant);
        } else {
            if (!trafficObservation.hasRequestDropped()) {
                return Optional.empty();
            }
            this.requestCounter.decrementAndGet();
            handleDroppedRequestForAccumulation(accumulation);
        }
        return Optional.of(CONNECTION_STATUS.ALIVE);
    }

    private Optional<CONNECTION_STATUS> handleObservationForWriteState(Accumulation accumulation, TrafficObservation trafficObservation, @NonNull ITrafficStreamKey iTrafficStreamKey, Instant instant) {
        if (iTrafficStreamKey == null) {
            throw new NullPointerException("trafficStreamKey is marked non-null but is null");
        }
        if (accumulation.state != Accumulation.State.ACCUMULATING_WRITES) {
            return Optional.empty();
        }
        String connectionId = iTrafficStreamKey.getConnectionId();
        if (trafficObservation.hasWrite()) {
            RequestResponsePacketPair rrPair = accumulation.getRrPair();
            log.atTrace().setMessage(() -> {
                return "Adding response data for accum[" + connectionId + "]=" + accumulation;
            }).log();
            rrPair.addResponseData(instant, trafficObservation.getWrite().getData().toByteArray());
            log.atTrace().setMessage(() -> {
                return "Added response data for accum[" + connectionId + "]=" + accumulation;
            }).log();
        } else if (trafficObservation.hasWriteSegment()) {
            log.atTrace().setMessage(() -> {
                return "Adding response segment for accum[" + connectionId + "]=" + accumulation;
            }).log();
            RequestResponsePacketPair rrPair2 = accumulation.getRrPair();
            if (rrPair2.responseData == null) {
                rrPair2.responseData = new HttpMessageAndTimestamp.Response(instant);
            }
            rrPair2.responseData.addSegment(trafficObservation.getWriteSegment().getData().toByteArray());
            log.atTrace().setMessage(() -> {
                return "Added response segment for accum[" + connectionId + "]=" + accumulation;
            }).log();
        } else {
            if (!trafficObservation.hasSegmentEnd()) {
                if (!trafficObservation.hasRead() && !trafficObservation.hasReadSegment()) {
                    return Optional.empty();
                }
                rotateAccumulationOnReadIfNecessary(connectionId, accumulation);
                return handleObservationForReadState(accumulation, trafficObservation, iTrafficStreamKey, instant);
            }
            RequestResponsePacketPair rrPair3 = accumulation.getRrPair();
            if (!$assertionsDisabled && !rrPair3.responseData.hasInProgressSegment()) {
                throw new AssertionError();
            }
            rrPair3.responseData.finalizeRequestSegments(instant);
        }
        return Optional.of(CONNECTION_STATUS.ALIVE);
    }

    private void handleDroppedRequestForAccumulation(Accumulation accumulation) {
        if (accumulation.hasRrPair()) {
            List<ITrafficStreamKey> trafficStreamsHeld = accumulation.getRrPair().getTrafficStreamsHeld();
            SpanWrappingAccumulationCallbacks spanWrappingAccumulationCallbacks = this.listener;
            Objects.requireNonNull(spanWrappingAccumulationCallbacks);
            trafficStreamsHeld.forEach(spanWrappingAccumulationCallbacks::onTrafficStreamIgnored);
        }
        log.atTrace().setMessage(() -> {
            return "resetting to forget " + accumulation.trafficChannelKey;
        }).log();
        accumulation.resetToIgnoreAndForgetCurrentRequest();
        log.atTrace().setMessage(() -> {
            return "done resetting to forget and accum=" + accumulation;
        }).log();
    }

    private boolean rotateAccumulationIfNecessary(String str, Accumulation accumulation) {
        if (accumulation.state != Accumulation.State.ACCUMULATING_WRITES) {
            return false;
        }
        log.atDebug().setMessage(() -> {
            return "handling EOM for accum[" + str + "]=" + accumulation;
        }).log();
        handleEndOfResponse(accumulation, RequestResponsePacketPair.ReconstructionStatus.COMPLETE);
        return true;
    }

    private boolean rotateAccumulationOnReadIfNecessary(String str, Accumulation accumulation) {
        if (!rotateAccumulationIfNecessary(str, accumulation)) {
            return false;
        }
        this.reusedKeepAliveCounter.incrementAndGet();
        return true;
    }

    private boolean handleEndOfRequest(Accumulation accumulation) {
        if (!$assertionsDisabled && accumulation.state != Accumulation.State.ACCUMULATING_READS) {
            throw new AssertionError("state == " + accumulation.state);
        }
        Accumulation.RequestResponsePacketPairWithCallback rrPairWithCallback = accumulation.getRrPairWithCallback();
        RequestResponsePacketPair requestResponsePacketPair = rrPairWithCallback.pair;
        HttpMessageAndTimestamp httpMessageAndTimestamp = requestResponsePacketPair.requestData;
        if (!$assertionsDisabled && httpMessageAndTimestamp == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && httpMessageAndTimestamp.hasInProgressSegment()) {
            throw new AssertionError();
        }
        IReplayContexts.IRequestAccumulationContext requestContext = requestResponsePacketPair.getRequestContext();
        requestResponsePacketPair.rotateRequestGatheringToResponse();
        rrPairWithCallback.setFullDataContinuation(this.listener.onRequestReceived(requestContext, httpMessageAndTimestamp));
        accumulation.state = Accumulation.State.ACCUMULATING_WRITES;
        return true;
    }

    private void handleEndOfResponse(Accumulation accumulation, RequestResponsePacketPair.ReconstructionStatus reconstructionStatus) {
        if (!$assertionsDisabled && accumulation.state != Accumulation.State.ACCUMULATING_WRITES) {
            throw new AssertionError();
        }
        Accumulation.RequestResponsePacketPairWithCallback rrPairWithCallback = accumulation.getRrPairWithCallback();
        RequestResponsePacketPair requestResponsePacketPair = rrPairWithCallback.pair;
        requestResponsePacketPair.completionStatus = reconstructionStatus;
        rrPairWithCallback.getFullDataContinuation().accept(requestResponsePacketPair);
        log.atTrace().setMessage("resetting for end of response").log();
        accumulation.resetForNextRequest();
    }

    public void close() {
        this.liveStreams.values().forEach(accumulation -> {
            this.requestsTerminatedUponAccumulatorCloseCounter.incrementAndGet();
            fireAccumulationsCallbacksAndClose(accumulation, RequestResponsePacketPair.ReconstructionStatus.CLOSED_PREMATURELY);
        });
        this.liveStreams.clear();
    }

    private void fireAccumulationsCallbacksAndClose(Accumulation accumulation, RequestResponsePacketPair.ReconstructionStatus reconstructionStatus) {
        boolean hasSignaledRequests;
        try {
            switch (accumulation.state) {
                case ACCUMULATING_READS:
                    log.atWarn().setMessage("Terminating a TrafficStream reconstruction before data was accumulated for " + accumulation.trafficChannelKey + " assuming an empty server interaction and NOT reproducing this to the target cluster.").log();
                    if (accumulation.hasRrPair()) {
                        this.listener.onTrafficStreamsExpired(reconstructionStatus, accumulation.trafficChannelKey.getTrafficStreamsContext(), Collections.unmodifiableList(accumulation.getRrPair().trafficStreamKeysBeingHeld));
                    }
                    if (hasSignaledRequests) {
                        return;
                    } else {
                        return;
                    }
                case ACCUMULATING_WRITES:
                    handleEndOfResponse(accumulation, reconstructionStatus);
                    break;
                case WAITING_FOR_NEXT_READ_CHUNK:
                case IGNORING_LAST_REQUEST:
                    break;
                default:
                    throw new IllegalStateException("Unknown enum type: " + accumulation.state);
            }
            if (accumulation.hasSignaledRequests()) {
                this.listener.onConnectionClose(accumulation, reconstructionStatus, accumulation.getLastTimestamp(), getTrafficStreamsHeldByAccum(accumulation));
            }
        } finally {
            if (accumulation.hasSignaledRequests()) {
                this.listener.onConnectionClose(accumulation, reconstructionStatus, accumulation.getLastTimestamp(), getTrafficStreamsHeldByAccum(accumulation));
            }
        }
    }

    static {
        $assertionsDisabled = !CapturedTrafficToHttpTransactionAccumulator.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(CapturedTrafficToHttpTransactionAccumulator.class);
        EXPIRATION_GRANULARITY = Duration.ofSeconds(1L);
    }
}
