package org.axonframework.axonserver.connector.event.axon;

import com.google.common.base.Strings;
import io.axoniq.axonserver.connector.event.PersistentStream;
import io.axoniq.axonserver.connector.event.PersistentStreamCallbacks;
import io.axoniq.axonserver.connector.event.PersistentStreamProperties;
import io.axoniq.axonserver.connector.event.PersistentStreamSegment;
import io.axoniq.axonserver.connector.impl.StreamClosedException;
import io.axoniq.axonserver.grpc.streams.PersistentStreamEvent;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.axonframework.config.Configuration;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventUtils;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.ReplayToken;
import org.axonframework.eventhandling.TrackedDomainEventData;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/axonserver/connector/event/axon/PersistentStreamConnection.class */
public class PersistentStreamConnection {
    private final Logger logger;
    private static final int MAX_MESSAGES_PER_RUN = 10000;
    private final String streamId;
    private final Configuration configuration;
    private final PersistentStreamProperties persistentStreamProperties;
    private final AtomicReference<PersistentStream> persistentStreamHolder;
    private final AtomicReference<Consumer<List<? extends EventMessage<?>>>> consumer;
    private final ScheduledExecutorService scheduler;
    private final int batchSize;
    private final Map<Integer, SegmentConnection> segments;
    private final AtomicInteger retrySeconds;
    private final String defaultContext;

    /* loaded from: input_file:org/axonframework/axonserver/connector/event/axon/PersistentStreamConnection$SegmentConnection.class */
    private class SegmentConnection {
        private final AtomicBoolean processGate = new AtomicBoolean();
        private final AtomicBoolean doneConfirmed = new AtomicBoolean();
        private final AtomicBoolean closed = new AtomicBoolean();
        private final PersistentStreamSegment persistentStreamSegment;

        public SegmentConnection(PersistentStreamSegment persistentStreamSegment) {
            this.persistentStreamSegment = persistentStreamSegment;
        }

        public void messageAvailable() {
            if (this.processGate.get()) {
                return;
            }
            PersistentStreamConnection.this.scheduler.submit(() -> {
                readMessagesFromSegment(this.persistentStreamSegment);
            });
        }

        private void readMessagesFromSegment(PersistentStreamSegment persistentStreamSegment) {
            if (this.processGate.compareAndSet(false, true)) {
                if (PersistentStreamConnection.this.logger.isTraceEnabled()) {
                    PersistentStreamConnection.this.logger.trace("{}[{}] readMessagesFromSegment - closed: {}", new Object[]{PersistentStreamConnection.this.streamId, Integer.valueOf(persistentStreamSegment.segment()), Boolean.valueOf(persistentStreamSegment.isClosed())});
                }
                try {
                    try {
                        try {
                            int max = Math.max(PersistentStreamConnection.MAX_MESSAGES_PER_RUN, PersistentStreamConnection.this.batchSize);
                            GrpcMetaDataAwareSerializer grpcMetaDataAwareSerializer = new GrpcMetaDataAwareSerializer(PersistentStreamConnection.this.configuration.eventSerializer());
                            while (max > 0 && !this.closed.get()) {
                                List<PersistentStreamEvent> readBatch = readBatch(persistentStreamSegment);
                                if (readBatch.isEmpty()) {
                                    break;
                                }
                                List<TrackedEventMessage<?>> upcastAndDeserialize = upcastAndDeserialize(readBatch, grpcMetaDataAwareSerializer);
                                if (!this.closed.get()) {
                                    ((Consumer) PersistentStreamConnection.this.consumer.get()).accept(upcastAndDeserialize);
                                    if (PersistentStreamConnection.this.logger.isTraceEnabled()) {
                                        PersistentStreamConnection.this.logger.trace("{}/{} processed {} entries", new Object[]{PersistentStreamConnection.this.streamId, Integer.valueOf(persistentStreamSegment.segment()), Integer.valueOf(upcastAndDeserialize.size())});
                                    }
                                    persistentStreamSegment.acknowledge(readBatch.get(readBatch.size() - 1).getEvent().getToken());
                                    max -= readBatch.size();
                                }
                            }
                            acknowledgeDoneWhenClosed(persistentStreamSegment);
                            this.processGate.set(false);
                            if (this.closed.get() || persistentStreamSegment.peek() == null) {
                                return;
                            }
                            PersistentStreamConnection.this.scheduler.submit(() -> {
                                readMessagesFromSegment(persistentStreamSegment);
                            });
                        } catch (Exception e) {
                            if (e instanceof InterruptedException) {
                                Thread.currentThread().interrupt();
                            }
                            persistentStreamSegment.error(e.getMessage());
                            PersistentStreamConnection.this.logger.warn("{}: Exception while processing events for segment {}", new Object[]{PersistentStreamConnection.this.streamId, Integer.valueOf(persistentStreamSegment.segment()), e});
                            close();
                            this.processGate.set(false);
                            if (this.closed.get() || persistentStreamSegment.peek() == null) {
                                return;
                            }
                            PersistentStreamConnection.this.scheduler.submit(() -> {
                                readMessagesFromSegment(persistentStreamSegment);
                            });
                        }
                    } catch (StreamClosedException e2) {
                        PersistentStreamConnection.this.logger.debug("{}: Stream closed for segment {}", PersistentStreamConnection.this.streamId, Integer.valueOf(persistentStreamSegment.segment()));
                        close();
                        this.processGate.set(false);
                        if (this.closed.get() || persistentStreamSegment.peek() == null) {
                            return;
                        }
                        PersistentStreamConnection.this.scheduler.submit(() -> {
                            readMessagesFromSegment(persistentStreamSegment);
                        });
                    }
                } catch (Throwable th) {
                    this.processGate.set(false);
                    if (!this.closed.get() && persistentStreamSegment.peek() != null) {
                        PersistentStreamConnection.this.scheduler.submit(() -> {
                            readMessagesFromSegment(persistentStreamSegment);
                        });
                    }
                    throw th;
                }
            }
        }

        private List<PersistentStreamEvent> readBatch(PersistentStreamSegment persistentStreamSegment) throws InterruptedException {
            PersistentStreamEvent persistentStreamEvent;
            LinkedList linkedList = new LinkedList();
            PersistentStreamEvent persistentStreamEvent2 = (PersistentStreamEvent) persistentStreamSegment.nextIfAvailable();
            if (persistentStreamEvent2 == null) {
                return linkedList;
            }
            linkedList.add(persistentStreamEvent2);
            while (linkedList.size() < PersistentStreamConnection.this.batchSize && !this.closed.get() && (persistentStreamEvent = (PersistentStreamEvent) persistentStreamSegment.nextIfAvailable(1L, TimeUnit.MILLISECONDS)) != null) {
                linkedList.add(persistentStreamEvent);
            }
            return linkedList;
        }

        private List<TrackedEventMessage<?>> upcastAndDeserialize(List<PersistentStreamEvent> list, GrpcMetaDataAwareSerializer grpcMetaDataAwareSerializer) {
            return (List) EventUtils.upcastAndDeserializeTrackedEvents(list.stream().map(persistentStreamEvent -> {
                return new TrackedDomainEventData(createToken(persistentStreamEvent), new GrpcBackedDomainEventData(persistentStreamEvent.getEvent().getEvent()));
            }), grpcMetaDataAwareSerializer, PersistentStreamConnection.this.configuration.upcasterChain()).collect(Collectors.toList());
        }

        private void acknowledgeDoneWhenClosed(PersistentStreamSegment persistentStreamSegment) {
            if (this.closed.get() && this.doneConfirmed.compareAndSet(false, true)) {
                persistentStreamSegment.acknowledge(-45L);
            }
        }

        private TrackingToken createToken(PersistentStreamEvent persistentStreamEvent) {
            return !persistentStreamEvent.getReplay() ? new GlobalSequenceTrackingToken(persistentStreamEvent.getEvent().getToken()) : ReplayToken.createReplayToken(new GlobalSequenceTrackingToken(persistentStreamEvent.getEvent().getToken() + 1), new GlobalSequenceTrackingToken(persistentStreamEvent.getEvent().getToken()));
        }

        public void close() {
            this.closed.set(true);
        }
    }

    public PersistentStreamConnection(String str, Configuration configuration, PersistentStreamProperties persistentStreamProperties, ScheduledExecutorService scheduledExecutorService, int i) {
        this.logger = LoggerFactory.getLogger(PersistentStreamConnection.class);
        this.persistentStreamHolder = new AtomicReference<>();
        this.consumer = new AtomicReference<>(list -> {
        });
        this.segments = new ConcurrentHashMap();
        this.retrySeconds = new AtomicInteger(1);
        this.streamId = str;
        this.configuration = configuration;
        this.persistentStreamProperties = persistentStreamProperties;
        this.scheduler = scheduledExecutorService;
        this.batchSize = i;
        this.defaultContext = null;
    }

    public PersistentStreamConnection(String str, Configuration configuration, PersistentStreamProperties persistentStreamProperties, ScheduledExecutorService scheduledExecutorService, int i, String str2) {
        this.logger = LoggerFactory.getLogger(PersistentStreamConnection.class);
        this.persistentStreamHolder = new AtomicReference<>();
        this.consumer = new AtomicReference<>(list -> {
        });
        this.segments = new ConcurrentHashMap();
        this.retrySeconds = new AtomicInteger(1);
        this.streamId = str;
        this.configuration = configuration;
        this.persistentStreamProperties = persistentStreamProperties;
        this.scheduler = scheduledExecutorService;
        this.batchSize = i;
        this.defaultContext = str2;
    }

    public void open(Consumer<List<? extends EventMessage<?>>> consumer) {
        this.consumer.set(consumer);
        start();
    }

    private void start() {
        AxonServerConnectionManager axonServerConnectionManager = (AxonServerConnectionManager) this.configuration.getComponent(AxonServerConnectionManager.class);
        AxonServerConfiguration axonServerConfiguration = (AxonServerConfiguration) this.configuration.getComponent(AxonServerConfiguration.class);
        this.persistentStreamHolder.set(axonServerConnectionManager.getConnection(Strings.isNullOrEmpty(this.defaultContext) ? axonServerConfiguration.getContext() : this.defaultContext).eventChannel().openPersistentStream(this.streamId, axonServerConfiguration.getEventFlowControl().getPermits().intValue(), axonServerConfiguration.getEventFlowControl().getNrOfNewPermits().intValue(), new PersistentStreamCallbacks(this::segmentOpened, this::segmentClosed, this::messageAvailable, this::streamClosed), this.persistentStreamProperties));
    }

    private void segmentOpened(PersistentStreamSegment persistentStreamSegment) {
        this.logger.info("Segment opened: {}", persistentStreamSegment);
        this.retrySeconds.set(1);
        this.segments.put(Integer.valueOf(persistentStreamSegment.segment()), new SegmentConnection(persistentStreamSegment));
    }

    private void segmentClosed(PersistentStreamSegment persistentStreamSegment) {
        SegmentConnection remove = this.segments.remove(Integer.valueOf(persistentStreamSegment.segment()));
        if (remove != null) {
            remove.close();
        }
        this.logger.info("Segment closed: {}", persistentStreamSegment);
    }

    private void messageAvailable(PersistentStreamSegment persistentStreamSegment) {
        SegmentConnection segmentConnection = this.segments.get(Integer.valueOf(persistentStreamSegment.segment()));
        if (segmentConnection != null) {
            segmentConnection.messageAvailable();
        }
    }

    private void streamClosed(Throwable th) {
        this.persistentStreamHolder.set(null);
        if (th != null) {
            this.logger.info("{}: Rescheduling persistent stream", this.streamId, th);
            this.scheduler.schedule(this::start, this.retrySeconds.getAndUpdate(i -> {
                return Math.min(60, i * 2);
            }), TimeUnit.SECONDS);
        }
    }

    public void close() {
        PersistentStream andSet = this.persistentStreamHolder.getAndSet(null);
        if (andSet != null) {
            andSet.close();
        }
    }
}
