package io.debezium.connector.mongodb.events;

import com.mongodb.ServerAddress;
import com.mongodb.ServerCursor;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import io.debezium.DebeziumException;
import io.debezium.annotation.Immutable;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.MongoDbTaskContext;
import io.debezium.connector.mongodb.metrics.MongoDbStreamingChangeEventSourceMetrics;
import io.debezium.util.Clock;
import io.debezium.util.DelayStrategy;
import io.debezium.util.Threads;
import java.io.Closeable;
import java.time.Duration;
import java.time.Instant;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:io/debezium/connector/mongodb/events/BufferingChangeStreamCursor.class */
public class BufferingChangeStreamCursor<TResult> implements MongoChangeStreamCursor<ResumableChangeStreamEvent<TResult>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BufferingChangeStreamCursor.class);
    public static final int THROTTLE_NO_MESSAGE_BEFORE_PAUSE = 5;
    private final EventFetcher<TResult> fetcher;
    private final ExecutorService executor;
    private final DelayStrategy throttler;
    private BsonDocument lastResumeToken;

    /* loaded from: input_file:io/debezium/connector/mongodb/events/BufferingChangeStreamCursor$EventFetcher.class */
    public static final class EventFetcher<TResult> implements Runnable, Closeable {
        public static final long QUEUE_OFFER_TIMEOUT_MS = 100;
        private final ChangeStreamIterable<TResult> stream;
        private final Semaphore capacity;
        private final Queue<ResumableChangeStreamEvent<TResult>> queue;
        private final DelayStrategy throttler;
        private final AtomicBoolean running;
        private final AtomicReference<MongoChangeStreamCursor<ChangeStreamDocument<TResult>>> cursorRef;
        private final MongoDbStreamingChangeEventSourceMetrics metrics;
        private final Clock clock;
        private int noMessageIterations;

        public EventFetcher(ChangeStreamIterable<TResult> changeStreamIterable, int i, MongoDbStreamingChangeEventSourceMetrics mongoDbStreamingChangeEventSourceMetrics, Clock clock, DelayStrategy delayStrategy) {
            this.noMessageIterations = 0;
            this.stream = changeStreamIterable;
            this.capacity = new Semaphore(i);
            this.metrics = mongoDbStreamingChangeEventSourceMetrics;
            this.clock = clock;
            this.throttler = delayStrategy;
            this.running = new AtomicBoolean(false);
            this.cursorRef = new AtomicReference<>(null);
            this.queue = new ConcurrentLinkedQueue();
        }

        public EventFetcher(ChangeStreamIterable<TResult> changeStreamIterable, int i, MongoDbStreamingChangeEventSourceMetrics mongoDbStreamingChangeEventSourceMetrics, Clock clock, Duration duration) {
            this(changeStreamIterable, i, mongoDbStreamingChangeEventSourceMetrics, clock, DelayStrategy.constant(duration));
        }

        public boolean isRunning() {
            return this.running.get();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            this.running.set(false);
        }

        public ResumableChangeStreamEvent<TResult> poll() {
            ResumableChangeStreamEvent<TResult> poll = this.queue.poll();
            if (poll != null) {
                this.capacity.release();
            }
            return poll;
        }

        public boolean isEmpty() {
            return this.queue.isEmpty();
        }

        public int size() {
            return this.queue.size();
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    MongoChangeStreamCursor<ChangeStreamDocument<TResult>> cursor = this.stream.cursor();
                    try {
                        this.cursorRef.compareAndSet(null, cursor);
                        this.running.set(true);
                        this.noMessageIterations = 0;
                        fetchEvents(cursor);
                        if (cursor != null) {
                            cursor.close();
                        }
                    } catch (Throwable th) {
                        if (cursor != null) {
                            try {
                                cursor.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } finally {
                    this.cursorRef.set(null);
                    close();
                }
            } catch (InterruptedException e) {
                throw new DebeziumException("Fetcher thread interrupted", e);
            }
        }

        private void fetchEvents(MongoChangeStreamCursor<ChangeStreamDocument<TResult>> mongoChangeStreamCursor) throws InterruptedException {
            ResumableChangeStreamEvent<TResult> resumableChangeStreamEvent = null;
            boolean z = false;
            while (isRunning()) {
                if (!z) {
                    Optional<ResumableChangeStreamEvent<TResult>> fetchEvent = fetchEvent(mongoChangeStreamCursor);
                    if (fetchEvent.isEmpty()) {
                        BufferingChangeStreamCursor.LOGGER.warn("Resume token not available on this poll");
                    } else {
                        resumableChangeStreamEvent = fetchEvent.get();
                    }
                }
                z = !enqueue(resumableChangeStreamEvent);
            }
        }

        private Optional<ResumableChangeStreamEvent<TResult>> fetchEvent(MongoChangeStreamCursor<ChangeStreamDocument<TResult>> mongoChangeStreamCursor) {
            Instant currentTimeAsInstant = this.clock.currentTimeAsInstant();
            ChangeStreamDocument<TResult> changeStreamDocument = (ChangeStreamDocument) mongoChangeStreamCursor.tryNext();
            this.metrics.onSourceEventPolled(changeStreamDocument, this.clock, currentTimeAsInstant);
            throttleIfNeeded(changeStreamDocument);
            return Optional.empty().or(() -> {
                return Optional.ofNullable(changeStreamDocument).map(ResumableChangeStreamEvent::new);
            }).or(() -> {
                return Optional.ofNullable(mongoChangeStreamCursor.getResumeToken()).map(ResumableChangeStreamEvent::new);
            });
        }

        private void throttleIfNeeded(ChangeStreamDocument<TResult> changeStreamDocument) {
            if (changeStreamDocument == null) {
                this.noMessageIterations++;
            }
            if (this.noMessageIterations >= 5) {
                BufferingChangeStreamCursor.LOGGER.debug("Sleeping after {} empty polls", Integer.valueOf(this.noMessageIterations));
                this.throttler.sleepWhen(true);
                this.noMessageIterations = 0;
            }
        }

        private boolean enqueue(ResumableChangeStreamEvent<TResult> resumableChangeStreamEvent) throws InterruptedException {
            if (this.capacity.tryAcquire(100L, TimeUnit.MILLISECONDS)) {
                return this.queue.offer(resumableChangeStreamEvent);
            }
            BufferingChangeStreamCursor.LOGGER.warn("Unable to acquire buffer lock, buffer queue is likely full");
            return false;
        }
    }

    @Immutable
    /* loaded from: input_file:io/debezium/connector/mongodb/events/BufferingChangeStreamCursor$ResumableChangeStreamEvent.class */
    public static final class ResumableChangeStreamEvent<TResult> {
        public final Optional<ChangeStreamDocument<TResult>> document;
        public final BsonDocument resumeToken;

        public ResumableChangeStreamEvent(ChangeStreamDocument<TResult> changeStreamDocument) {
            Objects.requireNonNull(changeStreamDocument);
            this.document = Optional.of(changeStreamDocument);
            this.resumeToken = changeStreamDocument.getResumeToken();
        }

        public ResumableChangeStreamEvent(BsonDocument bsonDocument) {
            Objects.requireNonNull(bsonDocument);
            this.document = Optional.empty();
            this.resumeToken = bsonDocument;
        }

        public boolean isEmpty() {
            return this.document.isEmpty();
        }

        public boolean hasDocument() {
            return this.document.isPresent();
        }

        public String toString() {
            Optional<U> map = this.document.map((v0) -> {
                return v0.toString();
            });
            BsonDocument bsonDocument = this.resumeToken;
            Objects.requireNonNull(bsonDocument);
            return (String) map.orElseGet(bsonDocument::toString);
        }
    }

    public static <TResult> BufferingChangeStreamCursor<TResult> fromIterable(ChangeStreamIterable<TResult> changeStreamIterable, MongoDbTaskContext mongoDbTaskContext, MongoDbStreamingChangeEventSourceMetrics mongoDbStreamingChangeEventSourceMetrics, Clock clock) {
        MongoDbConnectorConfig connectorConfig = mongoDbTaskContext.getConnectorConfig();
        return new BufferingChangeStreamCursor<>(new EventFetcher(changeStreamIterable, connectorConfig.getMaxBatchSize(), mongoDbStreamingChangeEventSourceMetrics, clock, connectorConfig.getPollInterval()), Threads.newFixedThreadPool(MongoDbConnector.class, mongoDbTaskContext.serverName(), "replicator-buffer", 1), connectorConfig.getPollInterval());
    }

    public BufferingChangeStreamCursor(EventFetcher<TResult> eventFetcher, ExecutorService executorService, DelayStrategy delayStrategy) {
        this.lastResumeToken = null;
        this.fetcher = eventFetcher;
        this.executor = executorService;
        this.throttler = delayStrategy;
    }

    public BufferingChangeStreamCursor(EventFetcher<TResult> eventFetcher, ExecutorService executorService, Duration duration) {
        this(eventFetcher, executorService, DelayStrategy.boundedExponential(Duration.ofMillis(1L), duration, 2.0d));
    }

    public BufferingChangeStreamCursor<TResult> start() {
        this.executor.submit(this.fetcher);
        return this;
    }

    /* renamed from: tryNext, reason: merged with bridge method [inline-methods] */
    public ResumableChangeStreamEvent<TResult> m36tryNext() {
        ResumableChangeStreamEvent<TResult> pollWithDelay = pollWithDelay();
        if (pollWithDelay != null) {
            this.lastResumeToken = pollWithDelay.resumeToken;
        }
        return pollWithDelay;
    }

    /* renamed from: next, reason: merged with bridge method [inline-methods] */
    public ResumableChangeStreamEvent<TResult> m37next() {
        if (hasNext()) {
            return m36tryNext();
        }
        throw new NoSuchElementException();
    }

    private ResumableChangeStreamEvent<TResult> pollWithDelay() {
        ResumableChangeStreamEvent<TResult> poll;
        do {
            poll = this.fetcher.poll();
        } while (this.throttler.sleepWhen(poll == null));
        return poll;
    }

    public boolean hasNext() {
        return !this.fetcher.isEmpty();
    }

    public int available() {
        return this.fetcher.size();
    }

    public BsonDocument getResumeToken() {
        return this.lastResumeToken;
    }

    public ServerCursor getServerCursor() {
        return ((EventFetcher) this.fetcher).cursorRef.get().getServerCursor();
    }

    public ServerAddress getServerAddress() {
        return ((EventFetcher) this.fetcher).cursorRef.get().getServerAddress();
    }

    public void close() {
        this.fetcher.close();
        this.executor.shutdown();
    }
}
