package org.opensearch.migrations.replay.kafka;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import lombok.NonNull;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.opensearch.migrations.replay.datatypes.ITrafficStreamKey;
import org.opensearch.migrations.replay.tracing.IKafkaConsumerContexts;
import org.opensearch.migrations.replay.tracing.ITrafficSourceContexts;
import org.opensearch.migrations.replay.tracing.KafkaConsumerContexts;
import org.opensearch.migrations.replay.tracing.RootReplayerContext;
import org.opensearch.migrations.replay.traffic.source.ITrafficCaptureSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

/* loaded from: input_file:org/opensearch/migrations/replay/kafka/TrackingKafkaConsumer.class */
public class TrackingKafkaConsumer implements ConsumerRebalanceListener {
    private static final Logger log;
    public static final int POLL_TIMEOUT_KEEP_ALIVE_DIVISOR = 4;

    @NonNull
    private final RootReplayerContext globalContext;
    private final Consumer<String, byte[]> kafkaConsumer;
    final String topic;
    private final Clock clock;
    final Map<Integer, OffsetLifecycleTracker> partitionToOffsetLifecycleTrackerMap;
    private final Object commitDataLock = new Object();
    final Map<TopicPartition, OffsetAndMetadata> nextSetOfCommitsMap;
    final Map<TopicPartition, PriorityQueue<OrderedKeyHolder>> nextSetOfKeysContextsBeingCommitted;
    final java.util.function.Consumer<ITrafficStreamKey> onCommitKeyCallback;
    private final Duration keepAliveInterval;
    private final AtomicReference<Instant> lastTouchTimeRef;
    private final AtomicInteger consumerConnectionGeneration;
    private final AtomicInteger kafkaRecordsLeftToCommitEventually;
    private final AtomicBoolean kafkaRecordsReadyToCommit;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/opensearch/migrations/replay/kafka/TrackingKafkaConsumer$OrderedKeyHolder.class */
    public static class OrderedKeyHolder implements Comparable<OrderedKeyHolder> {
        final long offset;

        @NonNull
        final ITrafficStreamKey tsk;

        @Override // java.lang.Comparable
        public int compareTo(OrderedKeyHolder orderedKeyHolder) {
            return Long.compare(this.offset, orderedKeyHolder.offset);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            OrderedKeyHolder orderedKeyHolder = (OrderedKeyHolder) obj;
            if (this.offset != orderedKeyHolder.offset) {
                return false;
            }
            return this.tsk.equals(orderedKeyHolder.tsk);
        }

        public int hashCode() {
            return Long.valueOf(this.offset).hashCode();
        }

        public OrderedKeyHolder(long j, @NonNull ITrafficStreamKey iTrafficStreamKey) {
            if (iTrafficStreamKey == null) {
                throw new NullPointerException("tsk is marked non-null but is null");
            }
            this.offset = j;
            this.tsk = iTrafficStreamKey;
        }

        public long getOffset() {
            return this.offset;
        }

        @NonNull
        public ITrafficStreamKey getTsk() {
            return this.tsk;
        }
    }

    public TrackingKafkaConsumer(@NonNull RootReplayerContext rootReplayerContext, Consumer<String, byte[]> consumer, String str, Duration duration, Clock clock, java.util.function.Consumer<ITrafficStreamKey> consumer2) {
        if (rootReplayerContext == null) {
            throw new NullPointerException("globalContext is marked non-null but is null");
        }
        this.globalContext = rootReplayerContext;
        this.kafkaConsumer = consumer;
        this.topic = str;
        this.clock = clock;
        this.partitionToOffsetLifecycleTrackerMap = new HashMap();
        this.nextSetOfCommitsMap = new HashMap();
        this.nextSetOfKeysContextsBeingCommitted = new HashMap();
        this.lastTouchTimeRef = new AtomicReference<>(Instant.EPOCH);
        this.consumerConnectionGeneration = new AtomicInteger();
        this.kafkaRecordsLeftToCommitEventually = new AtomicInteger();
        this.kafkaRecordsReadyToCommit = new AtomicBoolean();
        this.keepAliveInterval = duration;
        this.onCommitKeyCallback = consumer2;
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        if (collection.isEmpty()) {
            log.atDebug().setMessage(() -> {
                return this + " revoked no partitions.";
            }).log();
            return;
        }
        new KafkaConsumerContexts.AsyncListeningContext(this.globalContext).onPartitionsRevoked(collection);
        synchronized (this.commitDataLock) {
            RootReplayerContext rootReplayerContext = this.globalContext;
            Objects.requireNonNull(rootReplayerContext);
            safeCommit(rootReplayerContext::createCommitContext);
            collection.forEach(topicPartition -> {
                TopicPartition topicPartition = new TopicPartition(this.topic, topicPartition.partition());
                this.nextSetOfCommitsMap.remove(topicPartition);
                this.nextSetOfKeysContextsBeingCommitted.remove(topicPartition);
                this.partitionToOffsetLifecycleTrackerMap.remove(Integer.valueOf(topicPartition.partition()));
            });
            this.kafkaRecordsLeftToCommitEventually.set(this.partitionToOffsetLifecycleTrackerMap.values().stream().mapToInt((v0) -> {
                return v0.size();
            }).sum());
            this.kafkaRecordsReadyToCommit.set(!this.nextSetOfCommitsMap.values().isEmpty());
            log.atWarn().setMessage(() -> {
                return this + " partitions revoked for " + ((String) collection.stream().map((v0) -> {
                    return String.valueOf(v0);
                }).collect(Collectors.joining(",")));
            }).log();
        }
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        if (collection.isEmpty()) {
            log.atInfo().setMessage(() -> {
                return this + " assigned no new partitions.";
            }).log();
            return;
        }
        new KafkaConsumerContexts.AsyncListeningContext(this.globalContext).onPartitionsAssigned(collection);
        synchronized (this.commitDataLock) {
            this.consumerConnectionGeneration.incrementAndGet();
            collection.forEach(topicPartition -> {
                this.partitionToOffsetLifecycleTrackerMap.computeIfAbsent(Integer.valueOf(topicPartition.partition()), num -> {
                    return new OffsetLifecycleTracker(this.consumerConnectionGeneration.get());
                });
            });
            log.atInfo().setMessage(() -> {
                return this + " partitions added for " + ((String) collection.stream().map((v0) -> {
                    return String.valueOf(v0);
                }).collect(Collectors.joining(",")));
            }).log();
        }
    }

    public void close() {
        log.atInfo().setMessage(() -> {
            return "Kafka consumer closing.  Committing (implicitly by Kafka's consumer): " + nextCommitsToString();
        }).log();
        this.kafkaConsumer.close();
    }

    public Optional<Instant> getNextRequiredTouch() {
        Optional<Instant> of;
        Instant instant = this.lastTouchTimeRef.get();
        if (this.kafkaRecordsLeftToCommitEventually.get() == 0) {
            of = Optional.empty();
        } else {
            of = Optional.of(this.kafkaRecordsReadyToCommit.get() ? Instant.now() : instant.plus((TemporalAmount) this.keepAliveInterval));
        }
        Optional<Instant> optional = of;
        log.atTrace().setMessage(() -> {
            return "returning next required touch at " + ((String) optional.map(instant2 -> {
                return instant2;
            }).orElse("N/A")) + " from a lastTouchTime of " + instant;
        }).log();
        return optional;
    }

    /* JADX WARN: Finally extract failed */
    public void touch(ITrafficSourceContexts.IBackPressureBlockContext iBackPressureBlockContext) {
        IKafkaConsumerContexts.IPollScopeContext createNewPollContext;
        IKafkaConsumerContexts.ITouchScopeContext createNewTouchContext = iBackPressureBlockContext.createNewTouchContext();
        try {
            log.trace("touch() called.");
            pause();
            try {
                try {
                    createNewPollContext = createNewTouchContext.createNewPollContext();
                    try {
                    } catch (Throwable th) {
                        if (createNewPollContext != null) {
                            try {
                                createNewPollContext.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    resume();
                    throw th3;
                }
            } catch (IllegalStateException e) {
                throw e;
            } catch (RuntimeException e2) {
                log.atWarn().setCause(e2).setMessage("Unable to poll the topic: " + this.topic + " with our Kafka consumer. Swallowing and awaiting next metadata refresh to try again.").log();
                resume();
            }
            if (!this.kafkaConsumer.poll(Duration.ZERO).isEmpty()) {
                throw new IllegalStateException("Expected no entries once the consumer was paused.  This may have happened because a new assignment slipped into the consumer AFTER pause calls.");
            }
            if (createNewPollContext != null) {
                createNewPollContext.close();
            }
            resume();
            Objects.requireNonNull(iBackPressureBlockContext);
            safeCommit(iBackPressureBlockContext::createCommitContext);
            this.lastTouchTimeRef.set(this.clock.instant());
            if (createNewTouchContext != null) {
                createNewTouchContext.close();
            }
        } catch (Throwable th4) {
            if (createNewTouchContext != null) {
                try {
                    createNewTouchContext.close();
                } catch (Throwable th5) {
                    th4.addSuppressed(th5);
                }
            }
            throw th4;
        }
    }

    private void pause() {
        Set assignment = this.kafkaConsumer.assignment();
        try {
            this.kafkaConsumer.pause(assignment);
        } catch (IllegalStateException e) {
            log.atError().setCause(e).setMessage(() -> {
                return "Unable to pause the topic partitions: " + this.topic + ".  The active partitions passed here : " + ((String) assignment.stream().map((v0) -> {
                    return String.valueOf(v0);
                }).collect(Collectors.joining(","))) + ".  The active partitions as tracked here are: " + ((String) getActivePartitions().stream().map((v0) -> {
                    return String.valueOf(v0);
                }).collect(Collectors.joining(","))) + ".  The active partitions according to the consumer:  " + ((String) this.kafkaConsumer.assignment().stream().map((v0) -> {
                    return String.valueOf(v0);
                }).collect(Collectors.joining(",")));
            }).log();
        }
    }

    private void resume() {
        Set assignment = this.kafkaConsumer.assignment();
        try {
            this.kafkaConsumer.resume(assignment);
        } catch (IllegalStateException e) {
            log.atError().setCause(e).setMessage(() -> {
                return "Unable to resume the topic partitions: " + this.topic + ".  This may not be a fatal error for the entire process as the consumer should eventually rejoin and rebalance.  The active partitions passed here : " + ((String) assignment.stream().map((v0) -> {
                    return String.valueOf(v0);
                }).collect(Collectors.joining(","))) + ".  The active partitions as tracked here are: " + ((String) getActivePartitions().stream().map((v0) -> {
                    return String.valueOf(v0);
                }).collect(Collectors.joining(","))) + ".  The active partitions according to the consumer:  " + ((String) this.kafkaConsumer.assignment().stream().map((v0) -> {
                    return String.valueOf(v0);
                }).collect(Collectors.joining(",")));
            }).log();
        }
    }

    private Collection<TopicPartition> getActivePartitions() {
        return (Collection) this.partitionToOffsetLifecycleTrackerMap.keySet().stream().map(num -> {
            return new TopicPartition(this.topic, num.intValue());
        }).collect(Collectors.toList());
    }

    public <T> Stream<T> getNextBatchOfRecords(ITrafficSourceContexts.IReadChunkContext iReadChunkContext, BiFunction<KafkaCommitOffsetData, ConsumerRecord<String, byte[]>, T> biFunction) {
        Objects.requireNonNull(iReadChunkContext);
        safeCommit(iReadChunkContext::createCommitContext);
        ConsumerRecords<String, byte[]> safePollWithSwallowedRuntimeExceptions = safePollWithSwallowedRuntimeExceptions(iReadChunkContext);
        Objects.requireNonNull(iReadChunkContext);
        safeCommit(iReadChunkContext::createCommitContext);
        return applyBuilder(biFunction, safePollWithSwallowedRuntimeExceptions);
    }

    private <T> Stream<T> applyBuilder(BiFunction<KafkaCommitOffsetData, ConsumerRecord<String, byte[]>, T> biFunction, ConsumerRecords<String, byte[]> consumerRecords) {
        return StreamSupport.stream(consumerRecords.spliterator(), false).map(consumerRecord -> {
            OffsetLifecycleTracker offsetLifecycleTracker = this.partitionToOffsetLifecycleTrackerMap.get(Integer.valueOf(consumerRecord.partition()));
            PojoKafkaCommitOffsetData pojoKafkaCommitOffsetData = new PojoKafkaCommitOffsetData(offsetLifecycleTracker.consumerConnectionGeneration, consumerRecord.partition(), consumerRecord.offset());
            offsetLifecycleTracker.add(pojoKafkaCommitOffsetData.getOffset());
            this.kafkaRecordsLeftToCommitEventually.incrementAndGet();
            log.atTrace().setMessage(() -> {
                return "records in flight=" + this.kafkaRecordsLeftToCommitEventually.get();
            }).log();
            return biFunction.apply(pojoKafkaCommitOffsetData, consumerRecord);
        });
    }

    private ConsumerRecords<String, byte[]> safePollWithSwallowedRuntimeExceptions(ITrafficSourceContexts.IReadChunkContext iReadChunkContext) {
        try {
            this.lastTouchTimeRef.set(this.clock.instant());
            IKafkaConsumerContexts.IPollScopeContext createPollContext = iReadChunkContext.createPollContext();
            try {
                ConsumerRecords<String, byte[]> poll = this.kafkaConsumer.poll(this.keepAliveInterval.dividedBy(4L));
                if (createPollContext != null) {
                    createPollContext.close();
                }
                log.atLevel(poll.isEmpty() ? Level.TRACE : Level.INFO).setMessage(() -> {
                    return "Kafka consumer poll has fetched " + poll.count() + " records.  Records in flight=" + this.kafkaRecordsLeftToCommitEventually.get();
                }).log();
                log.atTrace().setMessage("{}").addArgument(() -> {
                    return "All positions: {" + ((String) this.kafkaConsumer.assignment().stream().map(topicPartition -> {
                        return topicPartition + ": " + this.kafkaConsumer.position(topicPartition);
                    }).collect(Collectors.joining(","))) + "}";
                }).log();
                log.atTrace().setMessage("{}").addArgument(() -> {
                    return "All previously COMMITTED positions: {" + ((String) this.kafkaConsumer.assignment().stream().map(topicPartition -> {
                        return topicPartition + ": " + this.kafkaConsumer.committed(topicPartition);
                    }).collect(Collectors.joining(","))) + "}";
                }).log();
                return poll;
            } finally {
            }
        } catch (RuntimeException e) {
            log.atWarn().setCause(e).setMessage("Unable to poll the topic: {} with our Kafka consumer. Swallowing and awaiting next metadata refresh to try again.").addArgument(this.topic).log();
            return new ConsumerRecords<>(Collections.emptyMap());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ITrafficCaptureSource.CommitResult commitKafkaKey(ITrafficStreamKey iTrafficStreamKey, KafkaCommitOffsetData kafkaCommitOffsetData) {
        OffsetLifecycleTracker offsetLifecycleTracker;
        synchronized (this.commitDataLock) {
            offsetLifecycleTracker = this.partitionToOffsetLifecycleTrackerMap.get(Integer.valueOf(kafkaCommitOffsetData.getPartition()));
        }
        if (offsetLifecycleTracker == null || offsetLifecycleTracker.consumerConnectionGeneration != kafkaCommitOffsetData.getGeneration()) {
            log.atWarn().setMessage(() -> {
                return "trafficKey's generation (" + kafkaCommitOffsetData.getGeneration() + ") is not current (" + ((String) Optional.ofNullable(offsetLifecycleTracker).map(offsetLifecycleTracker2 -> {
                    return "new generation=" + offsetLifecycleTracker2.consumerConnectionGeneration;
                }).orElse("Partition unassigned")) + ").  Dropping this commit request since the record would have been handled again by a current consumer within this process or another. Full key=" + kafkaCommitOffsetData;
            }).log();
            return ITrafficCaptureSource.CommitResult.IGNORED;
        }
        TopicPartition topicPartition = new TopicPartition(this.topic, kafkaCommitOffsetData.getPartition());
        return (ITrafficCaptureSource.CommitResult) offsetLifecycleTracker.removeAndReturnNewHead(kafkaCommitOffsetData.getOffset()).map(l -> {
            OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(l.longValue());
            log.atDebug().setMessage(() -> {
                return "Adding new commit " + topicPartition + "->" + offsetAndMetadata + " to map";
            }).log();
            synchronized (this.commitDataLock) {
                addKeyContextForEventualCommit(iTrafficStreamKey, kafkaCommitOffsetData, topicPartition);
                this.nextSetOfCommitsMap.put(topicPartition, offsetAndMetadata);
            }
            return ITrafficCaptureSource.CommitResult.AFTER_NEXT_READ;
        }).orElseGet(() -> {
            synchronized (this.commitDataLock) {
                addKeyContextForEventualCommit(iTrafficStreamKey, kafkaCommitOffsetData, topicPartition);
            }
            return ITrafficCaptureSource.CommitResult.BLOCKED_BY_OTHER_COMMITS;
        });
    }

    private void addKeyContextForEventualCommit(ITrafficStreamKey iTrafficStreamKey, KafkaCommitOffsetData kafkaCommitOffsetData, TopicPartition topicPartition) {
        this.nextSetOfKeysContextsBeingCommitted.computeIfAbsent(topicPartition, topicPartition2 -> {
            return new PriorityQueue();
        }).add(new OrderedKeyHolder(kafkaCommitOffsetData.getOffset(), iTrafficStreamKey));
    }

    private void safeCommit(Supplier<IKafkaConsumerContexts.ICommitScopeContext> supplier) {
        synchronized (this.commitDataLock) {
            if (this.nextSetOfCommitsMap.isEmpty()) {
                return;
            }
            IKafkaConsumerContexts.ICommitScopeContext iCommitScopeContext = supplier.get();
            HashMap hashMap = new HashMap(this.nextSetOfCommitsMap);
            try {
                try {
                    safeCommitStatic(iCommitScopeContext, this.kafkaConsumer, hashMap);
                    synchronized (this.commitDataLock) {
                        hashMap.entrySet().stream().forEach(entry -> {
                            callbackUpTo(this.onCommitKeyCallback, this.nextSetOfKeysContextsBeingCommitted.get(entry.getKey()), ((OffsetAndMetadata) entry.getValue()).offset());
                        });
                        hashMap.forEach((topicPartition, offsetAndMetadata) -> {
                            this.nextSetOfCommitsMap.remove(topicPartition);
                        });
                    }
                    log.trace("partitionToOffsetLifecycleTrackerMap=" + this.partitionToOffsetLifecycleTrackerMap);
                    this.kafkaRecordsLeftToCommitEventually.set(this.partitionToOffsetLifecycleTrackerMap.values().stream().mapToInt((v0) -> {
                        return v0.size();
                    }).sum());
                    log.atDebug().setMessage(() -> {
                        return "Done committing now records in flight=" + this.kafkaRecordsLeftToCommitEventually.get();
                    }).log();
                    if (iCommitScopeContext != null) {
                        iCommitScopeContext.close();
                    }
                } catch (RuntimeException e) {
                    log.atWarn().setCause(e).setMessage(() -> {
                        return "Error while committing.  Another consumer may already be processing messages before these commits.  Commits ARE NOT being discarded here, with the expectation that the revoked callback (onPartitionsRevoked) will be called.  Within that method, commits for unassigned partitions will be discarded.  After that, touch() or poll() will trigger another commit attempt.Those calls will occur in the near future if assigned partitions have pending commits." + ((String) this.nextSetOfCommitsMap.entrySet().stream().map(entry2 -> {
                            return entry2.getKey() + "->" + entry2.getValue();
                        }).collect(Collectors.joining(",")));
                    }).log();
                    if (iCommitScopeContext != null) {
                        iCommitScopeContext.close();
                    }
                }
            } catch (Throwable th) {
                if (iCommitScopeContext != null) {
                    iCommitScopeContext.close();
                }
                throw th;
            }
        }
    }

    private static void safeCommitStatic(IKafkaConsumerContexts.ICommitScopeContext iCommitScopeContext, Consumer<String, byte[]> consumer, HashMap<TopicPartition, OffsetAndMetadata> hashMap) {
        if (!$assertionsDisabled && hashMap.isEmpty()) {
            throw new AssertionError();
        }
        log.atDebug().setMessage(() -> {
            return "Committing " + hashMap;
        }).log();
        IKafkaConsumerContexts.IKafkaCommitScopeContext createNewKafkaCommitContext = iCommitScopeContext.createNewKafkaCommitContext();
        try {
            consumer.commitSync(hashMap);
            if (createNewKafkaCommitContext != null) {
                createNewKafkaCommitContext.close();
            }
        } catch (Throwable th) {
            if (createNewKafkaCommitContext != null) {
                try {
                    createNewKafkaCommitContext.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static void callbackUpTo(java.util.function.Consumer<ITrafficStreamKey> consumer, PriorityQueue<OrderedKeyHolder> priorityQueue, long j) {
        OrderedKeyHolder peek = priorityQueue.peek();
        while (true) {
            OrderedKeyHolder orderedKeyHolder = peek;
            if (orderedKeyHolder == null || orderedKeyHolder.offset > j) {
                return;
            }
            consumer.accept(orderedKeyHolder.tsk);
            priorityQueue.poll();
            peek = priorityQueue.peek();
        }
    }

    String nextCommitsToString() {
        return "nextCommits=" + ((String) this.nextSetOfCommitsMap.entrySet().stream().map(entry -> {
            return entry.getKey() + "->" + entry.getValue();
        }).collect(Collectors.joining(",")));
    }

    public String toString() {
        String format;
        synchronized (this.commitDataLock) {
            format = String.format("TrackingKafkaConsumer{topic='%s', partitionCount=%d, commitsPending=%d, recordsLeftToCommit=%d, recordsReadyToCommit=%b}", this.topic, Integer.valueOf(this.partitionToOffsetLifecycleTrackerMap.size()), Integer.valueOf(this.nextSetOfCommitsMap.size()), Integer.valueOf(this.kafkaRecordsLeftToCommitEventually.get()), Boolean.valueOf(this.kafkaRecordsReadyToCommit.get()));
        }
        return format;
    }

    static {
        $assertionsDisabled = !TrackingKafkaConsumer.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(TrackingKafkaConsumer.class);
    }
}
