package org.axonframework.eventsourcing;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.stream.BlockingStream;
import org.axonframework.eventhandling.MultiSourceTrackingToken;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.messaging.StreamableMessageSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/eventsourcing/MultiStreamableMessageSource.class */
public class MultiStreamableMessageSource implements StreamableMessageSource<TrackedEventMessage<?>> {
    private static final Logger logger = LoggerFactory.getLogger(MultiStreamableMessageSource.class);
    private final List<IdentifiedStreamableMessageSource> eventStreams;
    private final Comparator<Map.Entry<String, TrackedEventMessage<?>>> trackedEventComparator;

    /* loaded from: input_file:org/axonframework/eventsourcing/MultiStreamableMessageSource$Builder.class */
    public static class Builder {
        private Comparator<Map.Entry<String, TrackedEventMessage<?>>> trackedEventComparator = Comparator.comparing(entry -> {
            return ((TrackedEventMessage) entry.getValue()).getTimestamp();
        });
        private final Map<String, StreamableMessageSource<TrackedEventMessage<?>>> messageSourceMap = new LinkedHashMap();
        private String longPollingSource = "";

        public Builder addMessageSource(String str, StreamableMessageSource<TrackedEventMessage<?>> streamableMessageSource) {
            BuilderUtils.assertThat(str, str2 -> {
                return !this.messageSourceMap.containsKey(str2);
            }, "the messageSource name must be unique");
            this.messageSourceMap.put(str, streamableMessageSource);
            return this;
        }

        public Builder trackedEventComparator(Comparator<Map.Entry<String, TrackedEventMessage<?>>> comparator) {
            this.trackedEventComparator = comparator;
            return this;
        }

        public Builder longPollingSource(String str) {
            BuilderUtils.assertThat(str, str2 -> {
                return this.messageSourceMap.containsKey(str2);
            }, "Current configuration does not contain this message source");
            this.longPollingSource = str;
            return this;
        }

        public MultiStreamableMessageSource build() {
            return new MultiStreamableMessageSource(this);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<IdentifiedStreamableMessageSource> messageSources() {
            ArrayList arrayList = new ArrayList();
            this.messageSourceMap.forEach((str, streamableMessageSource) -> {
                if (this.longPollingSource.equals(str)) {
                    return;
                }
                arrayList.add(new IdentifiedStreamableMessageSource(str, streamableMessageSource));
            });
            if (this.messageSourceMap.containsKey(this.longPollingSource)) {
                arrayList.add(new IdentifiedStreamableMessageSource(this.longPollingSource, this.messageSourceMap.get(this.longPollingSource)));
            }
            return arrayList;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventsourcing/MultiStreamableMessageSource$IdentifiedStreamableMessageSource.class */
    public static class IdentifiedStreamableMessageSource implements StreamableMessageSource<TrackedEventMessage<?>> {
        private final StreamableMessageSource<TrackedEventMessage<?>> delegate;
        private final String sourceId;

        public IdentifiedStreamableMessageSource(String str, StreamableMessageSource<TrackedEventMessage<?>> streamableMessageSource) {
            this.delegate = streamableMessageSource;
            this.sourceId = str;
        }

        public BlockingStream<TrackedEventMessage<?>> openStream(TrackingToken trackingToken) {
            return this.delegate.openStream(trackingToken);
        }

        public TrackingToken createTailToken() {
            return this.delegate.createTailToken();
        }

        public TrackingToken createHeadToken() {
            return this.delegate.createHeadToken();
        }

        public TrackingToken createTokenAt(Instant instant) {
            return this.delegate.createTokenAt(instant);
        }

        public TrackingToken createTokenSince(Duration duration) {
            return this.delegate.createTokenSince(duration);
        }

        public String sourceId() {
            return this.sourceId;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventsourcing/MultiStreamableMessageSource$MultiSourceBlockingStream.class */
    public static class MultiSourceBlockingStream implements BlockingStream<TrackedEventMessage<?>> {
        private final List<SourceIdAwareBlockingStream> messageStreams = new ArrayList();
        private final Map<String, SourceIdAwareBlockingStream> streamBySourceId = new HashMap();
        private final Comparator<? super Map.Entry<String, TrackedEventMessage<?>>> trackedEventComparator;
        private MultiSourceTrackingToken trackingToken;
        private TrackedEventMessage<?> peekedMessage;

        public MultiSourceBlockingStream(Iterable<IdentifiedStreamableMessageSource> iterable, MultiSourceTrackingToken multiSourceTrackingToken, Comparator<? super Map.Entry<String, TrackedEventMessage<?>>> comparator) {
            this.trackedEventComparator = comparator;
            this.trackingToken = multiSourceTrackingToken;
            try {
                iterable.forEach(identifiedStreamableMessageSource -> {
                    SourceIdAwareBlockingStream sourceIdAwareBlockingStream = new SourceIdAwareBlockingStream(identifiedStreamableMessageSource.sourceId(), identifiedStreamableMessageSource.openStream(multiSourceTrackingToken.getTokenForStream(identifiedStreamableMessageSource.sourceId())));
                    this.messageStreams.add(sourceIdAwareBlockingStream);
                    this.streamBySourceId.put(identifiedStreamableMessageSource.sourceId(), sourceIdAwareBlockingStream);
                });
            } catch (Exception e) {
                this.messageStreams.forEach((v0) -> {
                    v0.close();
                });
                throw e;
            }
        }

        public boolean hasNextAvailable() {
            return this.peekedMessage != null || this.messageStreams.stream().anyMatch((v0) -> {
                return v0.hasNextAvailable();
            });
        }

        public Optional<TrackedEventMessage<?>> peek() {
            if (this.peekedMessage == null) {
                this.peekedMessage = doConsumeNext();
            }
            return Optional.ofNullable(this.peekedMessage);
        }

        private TrackedEventMessage<?> doConsumeNext() {
            HashMap hashMap = new HashMap();
            peekForMessages(hashMap);
            return (TrackedEventMessage) hashMap.entrySet().stream().min(this.trackedEventComparator).map(entry -> {
                String str = (String) entry.getKey();
                try {
                    return ((TrackedEventMessage) messageSource(str).nextAvailable()).withTrackingToken(this.trackingToken.advancedTo(str, ((TrackedEventMessage) entry.getValue()).trackingToken()));
                } catch (InterruptedException e) {
                    MultiStreamableMessageSource.logger.warn("Thread Interrupted whilst consuming next message", e);
                    Thread.currentThread().interrupt();
                    return null;
                }
            }).orElse(null);
        }

        private BlockingStream<TrackedEventMessage<?>> messageSource(String str) {
            return this.streamBySourceId.get(str);
        }

        public boolean hasNextAvailable(int i, TimeUnit timeUnit) throws InterruptedException {
            if (this.peekedMessage != null) {
                return true;
            }
            long currentTimeMillis = System.currentTimeMillis() + timeUnit.toMillis(i);
            long millis = timeUnit.toMillis(i) / 10;
            while (System.currentTimeMillis() < currentTimeMillis) {
                Iterator<SourceIdAwareBlockingStream> it = this.messageStreams.iterator();
                while (it.hasNext()) {
                    SourceIdAwareBlockingStream next = it.next();
                    if (it.hasNext()) {
                        if (next.hasNextAvailable()) {
                            return true;
                        }
                    } else if (next.hasNextAvailable((int) Math.min(millis, currentTimeMillis - System.currentTimeMillis()), TimeUnit.MILLISECONDS)) {
                        return true;
                    }
                }
            }
            return false;
        }

        /* renamed from: nextAvailable, reason: merged with bridge method [inline-methods] */
        public TrackedEventMessage<?> m17nextAvailable() throws InterruptedException {
            if (this.peekedMessage != null) {
                TrackedEventMessage<?> trackedEventMessage = this.peekedMessage;
                this.peekedMessage = null;
                this.trackingToken = trackedEventMessage.trackingToken();
                return trackedEventMessage;
            }
            HashMap hashMap = new HashMap();
            while (hashMap.size() == 0) {
                peekForMessages(hashMap);
            }
            String str = (String) hashMap.entrySet().stream().min(this.trackedEventComparator).map((v0) -> {
                return v0.getKey();
            }).orElse(null);
            TrackedEventMessage trackedEventMessage2 = (TrackedEventMessage) messageSource(str).nextAvailable();
            MultiSourceTrackingToken advancedTo = this.trackingToken.advancedTo(str, trackedEventMessage2.trackingToken());
            this.trackingToken = advancedTo;
            MultiStreamableMessageSource.logger.debug("Message consumed from stream: {}", str);
            return trackedEventMessage2.withTrackingToken(advancedTo);
        }

        private void peekForMessages(Map<String, TrackedEventMessage<?>> map) {
            for (SourceIdAwareBlockingStream sourceIdAwareBlockingStream : this.messageStreams) {
                sourceIdAwareBlockingStream.peek().ifPresent(trackedEventMessage -> {
                });
            }
        }

        public void close() {
            this.messageStreams.forEach((v0) -> {
                v0.close();
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventsourcing/MultiStreamableMessageSource$SourceIdAwareBlockingStream.class */
    public static class SourceIdAwareBlockingStream implements BlockingStream<TrackedEventMessage<?>> {
        private final String sourceId;
        private final BlockingStream<TrackedEventMessage<?>> delegate;

        public SourceIdAwareBlockingStream(String str, BlockingStream<TrackedEventMessage<?>> blockingStream) {
            this.sourceId = str;
            this.delegate = blockingStream;
        }

        public boolean hasNextAvailable() {
            return this.delegate.hasNextAvailable();
        }

        public Optional<TrackedEventMessage<?>> peek() {
            return this.delegate.peek();
        }

        public boolean hasNextAvailable(int i, TimeUnit timeUnit) throws InterruptedException {
            return this.delegate.hasNextAvailable(i, timeUnit);
        }

        /* renamed from: nextAvailable, reason: merged with bridge method [inline-methods] */
        public TrackedEventMessage<?> m18nextAvailable() throws InterruptedException {
            return (TrackedEventMessage) this.delegate.nextAvailable();
        }

        public void close() {
            this.delegate.close();
        }

        public Stream<TrackedEventMessage<?>> asStream() {
            return this.delegate.asStream();
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    protected MultiStreamableMessageSource(Builder builder) {
        this.eventStreams = builder.messageSources();
        this.trackedEventComparator = builder.trackedEventComparator;
    }

    /* renamed from: openStream, reason: merged with bridge method [inline-methods] */
    public MultiSourceBlockingStream m16openStream(TrackingToken trackingToken) {
        if (trackingToken == null) {
            return m16openStream((TrackingToken) m15createTailToken());
        }
        if (trackingToken instanceof MultiSourceTrackingToken) {
            return new MultiSourceBlockingStream(this.eventStreams, (MultiSourceTrackingToken) trackingToken, this.trackedEventComparator);
        }
        throw new IllegalArgumentException("Incompatible token type provided.");
    }

    /* renamed from: createTailToken, reason: merged with bridge method [inline-methods] */
    public MultiSourceTrackingToken m15createTailToken() {
        HashMap hashMap = new HashMap();
        this.eventStreams.forEach(identifiedStreamableMessageSource -> {
        });
        return new MultiSourceTrackingToken(hashMap);
    }

    /* renamed from: createHeadToken, reason: merged with bridge method [inline-methods] */
    public MultiSourceTrackingToken m14createHeadToken() {
        HashMap hashMap = new HashMap();
        this.eventStreams.forEach(identifiedStreamableMessageSource -> {
        });
        return new MultiSourceTrackingToken(hashMap);
    }

    /* renamed from: createTokenAt, reason: merged with bridge method [inline-methods] */
    public MultiSourceTrackingToken m13createTokenAt(Instant instant) {
        HashMap hashMap = new HashMap();
        this.eventStreams.forEach(identifiedStreamableMessageSource -> {
        });
        return new MultiSourceTrackingToken(hashMap);
    }

    /* renamed from: createTokenSince, reason: merged with bridge method [inline-methods] */
    public MultiSourceTrackingToken m12createTokenSince(Duration duration) {
        HashMap hashMap = new HashMap();
        this.eventStreams.forEach(identifiedStreamableMessageSource -> {
        });
        return new MultiSourceTrackingToken(hashMap);
    }
}
