package org.opensearch.migrations.replay;

import io.netty.buffer.Unpooled;
import java.io.EOFException;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.NonNull;
import org.opensearch.migrations.replay.RequestResponsePacketPair;
import org.opensearch.migrations.replay.datahandlers.IPacketFinalizingConsumer;
import org.opensearch.migrations.replay.datatypes.HttpRequestTransformationStatus;
import org.opensearch.migrations.replay.datatypes.ITrafficStreamKey;
import org.opensearch.migrations.replay.datatypes.RawPackets;
import org.opensearch.migrations.replay.datatypes.TransformedPackets;
import org.opensearch.migrations.replay.datatypes.UniqueReplayerRequestKey;
import org.opensearch.migrations.replay.tracing.IReplayContexts;
import org.opensearch.migrations.replay.tracing.IRootReplayerContext;
import org.opensearch.migrations.replay.traffic.source.ITrafficCaptureSource;
import org.opensearch.migrations.replay.traffic.source.ITrafficStreamWithKey;
import org.opensearch.migrations.replay.traffic.source.TrafficStreamLimiter;
import org.opensearch.migrations.replay.util.DiagnosticTrackableCompletableFuture;
import org.opensearch.migrations.replay.util.StringTrackableCompletableFuture;
import org.opensearch.migrations.trafficcapture.protos.TrafficStreamUtils;
import org.opensearch.migrations.transform.IAuthTransformerFactory;
import org.opensearch.migrations.transform.IJsonTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opensearch/migrations/replay/TrafficReplayerCore.class */
public abstract class TrafficReplayerCore {
    private static final Logger log = LoggerFactory.getLogger(TrafficReplayerCore.class);
    private final PacketToTransformingHttpHandlerFactory inputRequestTransformerFactory;
    protected final ClientConnectionPool clientConnectionPool;
    protected final TrafficStreamLimiter liveTrafficStreamLimiter;
    protected final AtomicInteger successfulRequestCount;
    protected final AtomicInteger exceptionRequestCount;
    public final IRootReplayerContext topLevelContext;
    protected final IWorkTracker<Void> requestWorkTracker;
    protected final AtomicBoolean stopReadingRef;
    protected final AtomicReference<CompletableFuture<List<ITrafficStreamWithKey>>> nextChunkFutureRef;

    /* loaded from: input_file:org/opensearch/migrations/replay/TrafficReplayerCore$IWorkTracker.class */
    public interface IWorkTracker<T> {
        void put(UniqueReplayerRequestKey uniqueReplayerRequestKey, DiagnosticTrackableCompletableFuture<String, T> diagnosticTrackableCompletableFuture);

        void remove(UniqueReplayerRequestKey uniqueReplayerRequestKey);

        boolean isEmpty();

        int size();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/opensearch/migrations/replay/TrafficReplayerCore$TrafficReplayerAccumulationCallbacks.class */
    public class TrafficReplayerAccumulationCallbacks implements AccumulationCallbacks {
        private final ReplayEngine replayEngine;
        private Consumer<SourceTargetCaptureTuple> resultTupleConsumer;
        private ITrafficCaptureSource trafficCaptureSource;

        @Override // org.opensearch.migrations.replay.AccumulationCallbacks
        public Consumer<RequestResponsePacketPair> onRequestReceived(@NonNull IReplayContexts.IReplayerHttpTransactionContext iReplayerHttpTransactionContext, @NonNull HttpMessageAndTimestamp httpMessageAndTimestamp) {
            if (iReplayerHttpTransactionContext == null) {
                throw new NullPointerException("ctx is marked non-null but is null");
            }
            if (httpMessageAndTimestamp == null) {
                throw new NullPointerException("request is marked non-null but is null");
            }
            this.replayEngine.setFirstTimestamp(httpMessageAndTimestamp.getFirstPacketTimestamp());
            StringTrackableCompletableFuture stringTrackableCompletableFuture = new StringTrackableCompletableFuture(new CompletableFuture(), () -> {
                return "waiting for " + iReplayerHttpTransactionContext + " to be queued and run through TrafficStreamLimiter";
            });
            StringTrackableCompletableFuture stringTrackableCompletableFuture2 = new StringTrackableCompletableFuture(new CompletableFuture(), () -> {
                return "Waiting to get response from target";
            });
            UniqueReplayerRequestKey replayerRequestKey = iReplayerHttpTransactionContext.getReplayerRequestKey();
            TrafficReplayerCore.this.liveTrafficStreamLimiter.queueWork(1, iReplayerHttpTransactionContext, workItem -> {
                TrafficReplayerCore.this.transformAndSendRequest(this.replayEngine, httpMessageAndTimestamp, iReplayerHttpTransactionContext).future.whenComplete((transformedTargetRequestAndResponse, th) -> {
                    TrafficReplayerCore.this.liveTrafficStreamLimiter.doneProcessing(workItem);
                    if (th != null) {
                        stringTrackableCompletableFuture2.future.completeExceptionally(th);
                    } else {
                        stringTrackableCompletableFuture2.future.complete(transformedTargetRequestAndResponse);
                    }
                });
            });
            if (!stringTrackableCompletableFuture.future.isDone()) {
                TrafficReplayerCore.log.trace("Adding " + replayerRequestKey + " to targetTransactionInProgressMap");
                TrafficReplayerCore.this.requestWorkTracker.put(replayerRequestKey, stringTrackableCompletableFuture);
                if (stringTrackableCompletableFuture.future.isDone()) {
                    TrafficReplayerCore.this.requestWorkTracker.remove(replayerRequestKey);
                }
            }
            return requestResponsePacketPair -> {
                stringTrackableCompletableFuture2.map(completableFuture -> {
                    return completableFuture.handle((transformedTargetRequestAndResponse, th) -> {
                        TrafficReplayerCore.log.atInfo().setMessage(() -> {
                            return "Done receiving captured stream for " + iReplayerHttpTransactionContext + ":" + requestResponsePacketPair.requestData;
                        }).log();
                        TrafficReplayerCore.log.atTrace().setMessage(() -> {
                            return "Summary response value for " + replayerRequestKey + " returned=" + transformedTargetRequestAndResponse;
                        }).log();
                        return handleCompletedTransaction(iReplayerHttpTransactionContext, requestResponsePacketPair, transformedTargetRequestAndResponse, th);
                    });
                }, () -> {
                    return "logging summary";
                }).whenComplete((r4, th) -> {
                    if (th != null) {
                        stringTrackableCompletableFuture.future.completeExceptionally(th);
                    } else {
                        stringTrackableCompletableFuture.future.complete(null);
                    }
                }, () -> {
                    return "";
                });
            };
        }

        Void handleCompletedTransaction(@NonNull IReplayContexts.IReplayerHttpTransactionContext iReplayerHttpTransactionContext, RequestResponsePacketPair requestResponsePacketPair, TransformedTargetRequestAndResponse transformedTargetRequestAndResponse, Throwable th) {
            try {
                if (iReplayerHttpTransactionContext == null) {
                    throw new NullPointerException("context is marked non-null but is null");
                }
                try {
                    IReplayContexts.IReplayerHttpTransactionContext httpTransactionContext = requestResponsePacketPair.getHttpTransactionContext();
                    if (th != null) {
                        try {
                            if (!(th instanceof Exception)) {
                                TrafficReplayerCore.log.atError().setCause(th).setMessage(() -> {
                                    return "Throwable passed to handle() for " + iReplayerHttpTransactionContext + ".  Rethrowing.";
                                }).log();
                                throw th;
                            }
                        } catch (Throwable th2) {
                            if (httpTransactionContext != null) {
                                try {
                                    httpTransactionContext.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            }
                            throw th2;
                        }
                    }
                    IReplayContexts.ITupleHandlingContext createTupleContext = httpTransactionContext.createTupleContext();
                    try {
                        packageAndWriteResponse(createTupleContext, this.resultTupleConsumer, requestResponsePacketPair, transformedTargetRequestAndResponse, (Exception) th);
                        if (createTupleContext != null) {
                            createTupleContext.close();
                        }
                        commitTrafficStreams(requestResponsePacketPair.completionStatus, requestResponsePacketPair.trafficStreamKeysBeingHeld);
                        if (httpTransactionContext != null) {
                            httpTransactionContext.close();
                        }
                        return null;
                    } catch (Throwable th4) {
                        if (createTupleContext != null) {
                            try {
                                createTupleContext.close();
                            } catch (Throwable th5) {
                                th4.addSuppressed(th5);
                            }
                        }
                        throw th4;
                    }
                } catch (Error e) {
                    TrafficReplayerCore.log.atError().setCause(e).setMessage(() -> {
                        return "Caught error and initiating TrafficReplayer shutdown";
                    }).log();
                    TrafficReplayerCore.this.shutdown(e);
                    throw e;
                } catch (Exception e2) {
                    TrafficReplayerCore.log.atError().setMessage("Unexpected exception while sending the aggregated response and context for {} to the callback.  Proceeding, but the tuple receiver context may be compromised.").addArgument(iReplayerHttpTransactionContext).setCause(e2).log();
                    throw e2;
                }
            } finally {
                UniqueReplayerRequestKey replayerRequestKey = iReplayerHttpTransactionContext.getReplayerRequestKey();
                TrafficReplayerCore.this.requestWorkTracker.remove(replayerRequestKey);
                TrafficReplayerCore.log.trace("removed rrPair.requestData to targetTransactionInProgressMap for " + replayerRequestKey);
            }
        }

        @Override // org.opensearch.migrations.replay.AccumulationCallbacks
        public void onTrafficStreamsExpired(RequestResponsePacketPair.ReconstructionStatus reconstructionStatus, @NonNull IReplayContexts.IChannelKeyContext iChannelKeyContext, @NonNull List<ITrafficStreamKey> list) {
            if (iChannelKeyContext == null) {
                throw new NullPointerException("ctx is marked non-null but is null");
            }
            if (list == null) {
                throw new NullPointerException("trafficStreamKeysBeingHeld is marked non-null but is null");
            }
            commitTrafficStreams(reconstructionStatus, list);
        }

        private void commitTrafficStreams(RequestResponsePacketPair.ReconstructionStatus reconstructionStatus, List<ITrafficStreamKey> list) {
            commitTrafficStreams(reconstructionStatus != RequestResponsePacketPair.ReconstructionStatus.CLOSED_PREMATURELY, list);
        }

        private void commitTrafficStreams(boolean z, List<ITrafficStreamKey> list) {
            if (z && list != null) {
                for (ITrafficStreamKey iTrafficStreamKey : list) {
                    iTrafficStreamKey.getTrafficStreamsContext().close();
                    this.trafficCaptureSource.commitTrafficStream(iTrafficStreamKey);
                }
            }
        }

        @Override // org.opensearch.migrations.replay.AccumulationCallbacks
        public void onConnectionClose(int i, @NonNull IReplayContexts.IChannelKeyContext iChannelKeyContext, int i2, RequestResponsePacketPair.ReconstructionStatus reconstructionStatus, @NonNull Instant instant, @NonNull List<ITrafficStreamKey> list) {
            if (iChannelKeyContext == null) {
                throw new NullPointerException("ctx is marked non-null but is null");
            }
            if (instant == null) {
                throw new NullPointerException("timestamp is marked non-null but is null");
            }
            if (list == null) {
                throw new NullPointerException("trafficStreamKeysBeingHeld is marked non-null but is null");
            }
            this.replayEngine.setFirstTimestamp(instant);
            this.replayEngine.closeConnection(i, iChannelKeyContext, i2, instant).map(completableFuture -> {
                return completableFuture.whenComplete((r7, th) -> {
                    commitTrafficStreams(reconstructionStatus, (List<ITrafficStreamKey>) list);
                });
            }, () -> {
                return "closing the channel in the ReplayEngine";
            });
        }

        @Override // org.opensearch.migrations.replay.AccumulationCallbacks
        public void onTrafficStreamIgnored(@NonNull IReplayContexts.ITrafficStreamsLifecycleContext iTrafficStreamsLifecycleContext) {
            if (iTrafficStreamsLifecycleContext == null) {
                throw new NullPointerException("ctx is marked non-null but is null");
            }
            commitTrafficStreams(true, List.of(iTrafficStreamsLifecycleContext.getTrafficStreamKey()));
        }

        private void packageAndWriteResponse(IReplayContexts.ITupleHandlingContext iTupleHandlingContext, Consumer<SourceTargetCaptureTuple> consumer, RequestResponsePacketPair requestResponsePacketPair, TransformedTargetRequestAndResponse transformedTargetRequestAndResponse, Exception exc) {
            TrafficReplayerCore.log.trace("done sending and finalizing data to the packet handler");
            SourceTargetCaptureTuple sourceTargetCaptureTuple = TrafficReplayerCore.getSourceTargetCaptureTuple(iTupleHandlingContext, requestResponsePacketPair, transformedTargetRequestAndResponse, exc);
            try {
                TrafficReplayerCore.log.atInfo().setMessage(() -> {
                    return "Source/Target Request/Response tuple: " + sourceTargetCaptureTuple;
                }).log();
                consumer.accept(sourceTargetCaptureTuple);
                if (sourceTargetCaptureTuple != null) {
                    sourceTargetCaptureTuple.close();
                }
                if (exc != null) {
                    throw new CompletionException(exc);
                }
                if (transformedTargetRequestAndResponse.getError() != null) {
                    TrafficReplayerCore.log.atInfo().setCause(transformedTargetRequestAndResponse.getError()).setMessage("Exception for {}: ").addArgument(iTupleHandlingContext).log();
                    TrafficReplayerCore.this.exceptionRequestCount.incrementAndGet();
                } else if (transformedTargetRequestAndResponse.getTransformationStatus() != HttpRequestTransformationStatus.ERROR) {
                    TrafficReplayerCore.this.successfulRequestCount.incrementAndGet();
                } else {
                    TrafficReplayerCore.log.atInfo().setCause(transformedTargetRequestAndResponse.getError()).setMessage("Unknown error transforming {}: ").addArgument(iTupleHandlingContext).log();
                    TrafficReplayerCore.this.exceptionRequestCount.incrementAndGet();
                }
            } catch (Throwable th) {
                if (sourceTargetCaptureTuple != null) {
                    try {
                        sourceTargetCaptureTuple.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }

        public TrafficReplayerAccumulationCallbacks(ReplayEngine replayEngine, Consumer<SourceTargetCaptureTuple> consumer, ITrafficCaptureSource iTrafficCaptureSource) {
            this.replayEngine = replayEngine;
            this.resultTupleConsumer = consumer;
            this.trafficCaptureSource = iTrafficCaptureSource;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TrafficReplayerCore(IRootReplayerContext iRootReplayerContext, URI uri, IAuthTransformerFactory iAuthTransformerFactory, IJsonTransformer iJsonTransformer, ClientConnectionPool clientConnectionPool, TrafficStreamLimiter trafficStreamLimiter, IWorkTracker<Void> iWorkTracker) {
        this.topLevelContext = iRootReplayerContext;
        if (uri.getPort() < 0) {
            throw new IllegalArgumentException("Port not present for URI: " + uri);
        }
        if (uri.getHost() == null) {
            throw new IllegalArgumentException("Hostname not present for URI: " + uri);
        }
        if (uri.getScheme() == null) {
            throw new IllegalArgumentException("Scheme (http|https) is not present for URI: " + uri);
        }
        this.liveTrafficStreamLimiter = trafficStreamLimiter;
        this.clientConnectionPool = clientConnectionPool;
        this.requestWorkTracker = iWorkTracker;
        this.inputRequestTransformerFactory = new PacketToTransformingHttpHandlerFactory(iJsonTransformer, iAuthTransformerFactory);
        this.successfulRequestCount = new AtomicInteger();
        this.exceptionRequestCount = new AtomicInteger();
        this.nextChunkFutureRef = new AtomicReference<>();
        this.stopReadingRef = new AtomicBoolean();
    }

    protected abstract CompletableFuture<Void> shutdown(Error error);

    private static SourceTargetCaptureTuple getSourceTargetCaptureTuple(@NonNull IReplayContexts.ITupleHandlingContext iTupleHandlingContext, RequestResponsePacketPair requestResponsePacketPair, TransformedTargetRequestAndResponse transformedTargetRequestAndResponse, Exception exc) {
        SourceTargetCaptureTuple sourceTargetCaptureTuple;
        if (iTupleHandlingContext == null) {
            throw new NullPointerException("tupleHandlingContext is marked non-null but is null");
        }
        if (exc != null) {
            log.error("Got exception in CompletableFuture callback: ", exc);
            sourceTargetCaptureTuple = new SourceTargetCaptureTuple(iTupleHandlingContext, requestResponsePacketPair, new TransformedPackets(), new ArrayList(), HttpRequestTransformationStatus.ERROR, exc, Duration.ZERO);
        } else {
            sourceTargetCaptureTuple = new SourceTargetCaptureTuple(iTupleHandlingContext, requestResponsePacketPair, transformedTargetRequestAndResponse.requestPackets, (List) transformedTargetRequestAndResponse.getReceiptTimeAndResponsePackets().map((v0) -> {
                return v0.getValue();
            }).collect(Collectors.toList()), transformedTargetRequestAndResponse.getTransformationStatus(), transformedTargetRequestAndResponse.getError(), transformedTargetRequestAndResponse.getResponseDuration());
        }
        return sourceTargetCaptureTuple;
    }

    public DiagnosticTrackableCompletableFuture<String, TransformedTargetRequestAndResponse> transformAndSendRequest(ReplayEngine replayEngine, HttpMessageAndTimestamp httpMessageAndTimestamp, IReplayContexts.IReplayerHttpTransactionContext iReplayerHttpTransactionContext) {
        PacketToTransformingHttpHandlerFactory packetToTransformingHttpHandlerFactory = this.inputRequestTransformerFactory;
        Instant firstPacketTimestamp = httpMessageAndTimestamp.getFirstPacketTimestamp();
        Instant lastPacketTimestamp = httpMessageAndTimestamp.getLastPacketTimestamp();
        RawPackets rawPackets = httpMessageAndTimestamp.packetBytes;
        Objects.requireNonNull(rawPackets);
        return transformAndSendRequest(packetToTransformingHttpHandlerFactory, replayEngine, iReplayerHttpTransactionContext, firstPacketTimestamp, lastPacketTimestamp, rawPackets::stream);
    }

    public static DiagnosticTrackableCompletableFuture<String, TransformedTargetRequestAndResponse> transformAndSendRequest(PacketToTransformingHttpHandlerFactory packetToTransformingHttpHandlerFactory, ReplayEngine replayEngine, IReplayContexts.IReplayerHttpTransactionContext iReplayerHttpTransactionContext, @NonNull Instant instant, @NonNull Instant instant2, Supplier<Stream<byte[]>> supplier) {
        if (instant == null) {
            throw new NullPointerException("start is marked non-null but is null");
        }
        if (instant2 == null) {
            throw new NullPointerException("end is marked non-null but is null");
        }
        try {
            DiagnosticTrackableCompletableFuture scheduleTransformationWork = replayEngine.scheduleTransformationWork(iReplayerHttpTransactionContext, instant, () -> {
                return transformAllData(packetToTransformingHttpHandlerFactory.create(iReplayerHttpTransactionContext), supplier);
            });
            log.atDebug().setMessage(() -> {
                return "finalizeRequest future for transformation of " + iReplayerHttpTransactionContext + " = " + scheduleTransformationWork;
            }).log();
            return scheduleTransformationWork.thenCompose(transformedOutputAndResult -> {
                return replayEngine.scheduleRequest(iReplayerHttpTransactionContext, instant, instant2, ((TransformedPackets) transformedOutputAndResult.transformedOutput).size(), ((TransformedPackets) transformedOutputAndResult.transformedOutput).streamRetained()).map(completableFuture -> {
                    return completableFuture.thenApply(aggregatedRawResponse -> {
                        return new TransformedTargetRequestAndResponse((TransformedPackets) transformedOutputAndResult.transformedOutput, aggregatedRawResponse, transformedOutputAndResult.transformationStatus, aggregatedRawResponse.error);
                    });
                }, () -> {
                    return "(if applicable) packaging transformed result into a completed TransformedTargetRequestAndResponse object";
                }).map(completableFuture2 -> {
                    return completableFuture2.exceptionally(th -> {
                        return new TransformedTargetRequestAndResponse((TransformedPackets) transformedOutputAndResult.transformedOutput, transformedOutputAndResult.transformationStatus, th);
                    });
                }, () -> {
                    return "(if applicable) packaging transformed result into a failed TransformedTargetRequestAndResponse object";
                });
            }, () -> {
                return "transitioning transformed packets onto the wire";
            }).map(completableFuture -> {
                return completableFuture.exceptionally(th -> {
                    return new TransformedTargetRequestAndResponse(null, null, th);
                });
            }, () -> {
                return "Checking for exception out of sending data to the target server";
            });
        } catch (Exception e) {
            log.debug("Caught exception in writeToSocket, so failing future");
            return StringTrackableCompletableFuture.failedFuture(e, () -> {
                return "TrafficReplayer.writeToSocketAndClose";
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <R> DiagnosticTrackableCompletableFuture<String, R> transformAllData(IPacketFinalizingConsumer<R> iPacketFinalizingConsumer, Supplier<Stream<byte[]>> supplier) {
        try {
            String simpleName = iPacketFinalizingConsumer.getClass().getSimpleName();
            supplier.get().map(Unpooled::wrappedBuffer).forEach(byteBuf -> {
                log.atDebug().setMessage(() -> {
                    return simpleName + " sending " + byteBuf.readableBytes() + " bytes to the packetHandler";
                }).log();
                DiagnosticTrackableCompletableFuture<String, Void> consumeBytes = iPacketFinalizingConsumer.consumeBytes(byteBuf);
                log.atDebug().setMessage(() -> {
                    return simpleName + " consumeFuture = " + consumeBytes;
                }).log();
            });
            log.atDebug().setMessage(() -> {
                return simpleName + "  done sending bytes, now finalizing the request";
            }).log();
            return iPacketFinalizingConsumer.finalizeRequest();
        } catch (Exception e) {
            log.atInfo().setCause(e).setMessage("Encountered an exception while transforming the http request.  The base64 gzipped traffic stream, for later diagnostic purposes, is: " + Utils.packetsToCompressedTrafficStream(supplier.get())).log();
            throw e;
        }
    }

    public void pullCaptureFromSourceToAccumulator(ITrafficCaptureSource iTrafficCaptureSource, CapturedTrafficToHttpTransactionAccumulator capturedTrafficToHttpTransactionAccumulator) throws InterruptedException {
        while (true) {
            log.trace("Reading next chunk from TrafficStream supplier");
            if (this.stopReadingRef.get()) {
                break;
            }
            AtomicReference<CompletableFuture<List<ITrafficStreamWithKey>>> atomicReference = this.nextChunkFutureRef;
            IRootReplayerContext iRootReplayerContext = this.topLevelContext;
            Objects.requireNonNull(iRootReplayerContext);
            atomicReference.set(iTrafficCaptureSource.readNextTrafficStreamChunk(iRootReplayerContext::createReadChunkContext));
            try {
                List<ITrafficStreamWithKey> list = this.nextChunkFutureRef.get().get();
                if (log.isInfoEnabled()) {
                    Optional.of((String) list.stream().map(iTrafficStreamWithKey -> {
                        return TrafficStreamUtils.summarizeTrafficStream(iTrafficStreamWithKey.getStream());
                    }).collect(Collectors.joining(";"))).filter(str -> {
                        return !str.isEmpty();
                    }).ifPresent(str2 -> {
                        log.atInfo().log("TrafficStream Summary: {" + str2 + "}");
                    });
                }
                Objects.requireNonNull(capturedTrafficToHttpTransactionAccumulator);
                list.forEach(capturedTrafficToHttpTransactionAccumulator::accept);
            } catch (ExecutionException e) {
                if (!(e.getCause() instanceof EOFException)) {
                    log.atWarn().setCause(e).setMessage("Done reading traffic streams due to exception.").log();
                    throw e.getCause();
                }
                log.atWarn().setCause(e.getCause()).setMessage("Got an EOF on the stream.  Done reading traffic streams.").log();
            }
        }
    }
}
