package org.apache.kafka.connect.mirror;

import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/mirror/OffsetSyncWriter.class */
class OffsetSyncWriter implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(OffsetSyncWriter.class);
    private static final int MAX_OUTSTANDING_OFFSET_SYNCS = 10;
    private final Map<TopicPartition, OffsetSync> delayedOffsetSyncs;
    private final Map<TopicPartition, OffsetSync> pendingOffsetSyncs;
    private final Semaphore outstandingOffsetSyncs;
    private final KafkaProducer<byte[], byte[]> offsetProducer;
    private final String offsetSyncsTopic;
    private final long maxOffsetLag;
    private final Map<TopicPartition, PartitionState> partitionStates;

    /* loaded from: input_file:org/apache/kafka/connect/mirror/OffsetSyncWriter$PartitionState.class */
    static class PartitionState {
        long previousUpstreamOffset = -1;
        long previousDownstreamOffset = -1;
        long lastSyncDownstreamOffset = -1;
        long maxOffsetLag;
        boolean shouldSyncOffsets;

        PartitionState(long j) {
            this.maxOffsetLag = j;
        }

        boolean update(long j, long j2) {
            boolean z = this.lastSyncDownstreamOffset == -1;
            boolean z2 = j2 - (this.lastSyncDownstreamOffset + 1) >= this.maxOffsetLag;
            boolean z3 = j - this.previousUpstreamOffset != 1;
            boolean z4 = j2 < this.previousDownstreamOffset;
            if (z || z2 || z3 || z4) {
                this.lastSyncDownstreamOffset = j2;
                this.shouldSyncOffsets = true;
            }
            this.previousUpstreamOffset = j;
            this.previousDownstreamOffset = j2;
            return this.shouldSyncOffsets;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!(obj instanceof PartitionState)) {
                return false;
            }
            PartitionState partitionState = (PartitionState) obj;
            return this.previousUpstreamOffset == partitionState.previousUpstreamOffset && this.previousDownstreamOffset == partitionState.previousDownstreamOffset && this.lastSyncDownstreamOffset == partitionState.lastSyncDownstreamOffset && this.maxOffsetLag == partitionState.maxOffsetLag && this.shouldSyncOffsets == partitionState.shouldSyncOffsets;
        }

        public int hashCode() {
            return Objects.hash(Long.valueOf(this.previousUpstreamOffset), Long.valueOf(this.previousDownstreamOffset), Long.valueOf(this.lastSyncDownstreamOffset), Long.valueOf(this.maxOffsetLag), Boolean.valueOf(this.shouldSyncOffsets));
        }

        void reset() {
            this.shouldSyncOffsets = false;
        }
    }

    public OffsetSyncWriter(MirrorSourceTaskConfig mirrorSourceTaskConfig) {
        this.delayedOffsetSyncs = new LinkedHashMap();
        this.pendingOffsetSyncs = new LinkedHashMap();
        this.partitionStates = new HashMap();
        this.outstandingOffsetSyncs = new Semaphore(MAX_OUTSTANDING_OFFSET_SYNCS);
        this.offsetSyncsTopic = mirrorSourceTaskConfig.offsetSyncsTopic();
        this.offsetProducer = MirrorUtils.newProducer(mirrorSourceTaskConfig.offsetSyncsTopicProducerConfig());
        this.maxOffsetLag = mirrorSourceTaskConfig.maxOffsetLag();
    }

    public OffsetSyncWriter(KafkaProducer<byte[], byte[]> kafkaProducer, String str, Semaphore semaphore, long j) {
        this.delayedOffsetSyncs = new LinkedHashMap();
        this.pendingOffsetSyncs = new LinkedHashMap();
        this.partitionStates = new HashMap();
        this.offsetProducer = kafkaProducer;
        this.offsetSyncsTopic = str;
        this.outstandingOffsetSyncs = semaphore;
        this.maxOffsetLag = j;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        Utils.closeQuietly(this.offsetProducer, "offset producer");
    }

    public long maxOffsetLag() {
        return this.maxOffsetLag;
    }

    public Map<TopicPartition, PartitionState> partitionStates() {
        return this.partitionStates;
    }

    private void sendOffsetSync(OffsetSync offsetSync) {
        this.offsetProducer.send(new ProducerRecord(this.offsetSyncsTopic, 0, offsetSync.recordKey(), offsetSync.recordValue()), (recordMetadata, exc) -> {
            if (exc != null) {
                LOG.error("Failure sending offset sync.", exc);
            } else {
                LOG.trace("Sync'd offsets for {}: {}=={}", new Object[]{offsetSync.topicPartition(), Long.valueOf(offsetSync.upstreamOffset()), Long.valueOf(offsetSync.downstreamOffset())});
            }
            this.outstandingOffsetSyncs.release();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void firePendingOffsetSyncs() {
        OffsetSync next;
        while (true) {
            synchronized (this) {
                Iterator<OffsetSync> it = this.pendingOffsetSyncs.values().iterator();
                if (!it.hasNext()) {
                    LOG.trace("No more pending offset syncs");
                    return;
                }
                next = it.next();
                if (!this.outstandingOffsetSyncs.tryAcquire()) {
                    LOG.trace("Too many in-flight offset syncs; will try to send remaining offset syncs later");
                    return;
                }
                it.remove();
            }
            sendOffsetSync(next);
            LOG.trace("Dispatched offset sync for {}", next.topicPartition());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void promoteDelayedOffsetSyncs() {
        this.pendingOffsetSyncs.putAll(this.delayedOffsetSyncs);
        this.delayedOffsetSyncs.clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void maybeQueueOffsetSyncs(TopicPartition topicPartition, long j, long j2) {
        PartitionState computeIfAbsent = this.partitionStates.computeIfAbsent(topicPartition, topicPartition2 -> {
            return new PartitionState(this.maxOffsetLag);
        });
        OffsetSync offsetSync = new OffsetSync(topicPartition, j, j2);
        if (!computeIfAbsent.update(j, j2)) {
            synchronized (this) {
                this.delayedOffsetSyncs.put(topicPartition, offsetSync);
            }
        } else {
            synchronized (this) {
                this.delayedOffsetSyncs.remove(topicPartition);
                this.pendingOffsetSyncs.put(topicPartition, offsetSync);
            }
            computeIfAbsent.reset();
        }
    }

    protected Map<TopicPartition, OffsetSync> getDelayedOffsetSyncs() {
        return this.delayedOffsetSyncs;
    }

    protected Map<TopicPartition, OffsetSync> getPendingOffsetSyncs() {
        return this.pendingOffsetSyncs;
    }
}
