package org.apache.kafka.connect.mirror;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/mirror/MirrorCheckpointTask.class */
public class MirrorCheckpointTask extends SourceTask {
    private static final Logger log = LoggerFactory.getLogger(MirrorCheckpointTask.class);
    private Admin sourceAdminClient;
    private Admin targetAdminClient;
    private String sourceClusterAlias;
    private String targetClusterAlias;
    private String checkpointsTopic;
    private Duration interval;
    private Duration pollTimeout;
    private TopicFilter topicFilter;
    private Set<String> consumerGroups;
    private ReplicationPolicy replicationPolicy;
    private OffsetSyncStore offsetSyncStore;
    private boolean stopping;
    private MirrorCheckpointMetrics metrics;
    private Scheduler scheduler;
    private Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset;
    private CheckpointStore checkpointStore;

    public MirrorCheckpointTask() {
    }

    MirrorCheckpointTask(String str, String str2, ReplicationPolicy replicationPolicy, OffsetSyncStore offsetSyncStore, Set<String> set, Map<String, Map<TopicPartition, OffsetAndMetadata>> map, CheckpointStore checkpointStore) {
        this.sourceClusterAlias = str;
        this.targetClusterAlias = str2;
        this.replicationPolicy = replicationPolicy;
        this.offsetSyncStore = offsetSyncStore;
        this.consumerGroups = set;
        this.idleConsumerGroupsOffset = map;
        this.checkpointStore = checkpointStore;
        this.topicFilter = str3 -> {
            return true;
        };
        this.interval = Duration.ofNanos(1L);
        this.pollTimeout = Duration.ofNanos(1L);
    }

    public void start(Map<String, String> map) {
        MirrorCheckpointTaskConfig mirrorCheckpointTaskConfig = new MirrorCheckpointTaskConfig(map);
        this.stopping = false;
        this.sourceClusterAlias = mirrorCheckpointTaskConfig.sourceClusterAlias();
        this.targetClusterAlias = mirrorCheckpointTaskConfig.targetClusterAlias();
        this.consumerGroups = mirrorCheckpointTaskConfig.taskConsumerGroups();
        this.checkpointsTopic = mirrorCheckpointTaskConfig.checkpointsTopic();
        this.topicFilter = mirrorCheckpointTaskConfig.topicFilter();
        this.replicationPolicy = mirrorCheckpointTaskConfig.replicationPolicy();
        this.interval = mirrorCheckpointTaskConfig.emitCheckpointsInterval();
        this.pollTimeout = mirrorCheckpointTaskConfig.consumerPollTimeout();
        this.offsetSyncStore = new OffsetSyncStore(mirrorCheckpointTaskConfig);
        this.sourceAdminClient = mirrorCheckpointTaskConfig.forwardingAdmin(mirrorCheckpointTaskConfig.sourceAdminConfig("checkpoint-source-admin"));
        this.targetAdminClient = mirrorCheckpointTaskConfig.forwardingAdmin(mirrorCheckpointTaskConfig.targetAdminConfig("checkpoint-target-admin"));
        this.metrics = mirrorCheckpointTaskConfig.metrics();
        this.idleConsumerGroupsOffset = new HashMap();
        this.checkpointStore = new CheckpointStore(mirrorCheckpointTaskConfig, this.consumerGroups);
        this.scheduler = new Scheduler(getClass(), mirrorCheckpointTaskConfig.entityLabel(), mirrorCheckpointTaskConfig.adminTimeout());
        this.scheduler.execute(() -> {
            this.offsetSyncStore.start(!this.checkpointStore.start());
            this.scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, mirrorCheckpointTaskConfig.syncGroupOffsetsInterval(), "refreshing idle consumers group offsets at target cluster");
            this.scheduler.scheduleRepeatingDelayed(this::syncGroupOffset, mirrorCheckpointTaskConfig.syncGroupOffsetsInterval(), "sync idle consumer group offset from source to target");
        }, "starting checkpoint and offset sync stores");
        log.info("{} checkpointing {} consumer groups {}->{}: {}.", new Object[]{Thread.currentThread().getName(), Integer.valueOf(this.consumerGroups.size()), this.sourceClusterAlias, mirrorCheckpointTaskConfig.targetClusterAlias(), this.consumerGroups});
    }

    public void commit() {
    }

    public void stop() {
        long currentTimeMillis = System.currentTimeMillis();
        this.stopping = true;
        Utils.closeQuietly(this.topicFilter, "topic filter");
        Utils.closeQuietly(this.checkpointStore, "checkpoints store");
        Utils.closeQuietly(this.offsetSyncStore, "offset sync store");
        Utils.closeQuietly(this.sourceAdminClient, "source admin client");
        Utils.closeQuietly(this.targetAdminClient, "target admin client");
        Utils.closeQuietly(this.metrics, "metrics");
        Utils.closeQuietly(this.scheduler, "scheduler");
        log.info("Stopping {} took {} ms.", Thread.currentThread().getName(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public String version() {
        return new MirrorCheckpointConnector().version();
    }

    public List<SourceRecord> poll() throws InterruptedException {
        try {
            long currentTimeMillis = System.currentTimeMillis() + this.interval.toMillis();
            while (!this.stopping && System.currentTimeMillis() < currentTimeMillis) {
                Thread.sleep(this.pollTimeout.toMillis());
            }
            if (this.stopping || !this.checkpointStore.isInitialized()) {
                return null;
            }
            ArrayList arrayList = new ArrayList();
            Iterator<String> it = this.consumerGroups.iterator();
            while (it.hasNext()) {
                arrayList.addAll(sourceRecordsForGroup(it.next()));
            }
            if (arrayList.isEmpty()) {
                return null;
            }
            return arrayList;
        } catch (Throwable th) {
            log.warn("Failure polling consumer state for checkpoints.", th);
            return null;
        }
    }

    List<SourceRecord> sourceRecordsForGroup(String str) throws InterruptedException {
        try {
            long currentTimeMillis = System.currentTimeMillis();
            Map<TopicPartition, Checkpoint> checkpointsForGroup = checkpointsForGroup(listConsumerGroupOffsets(str), str);
            this.checkpointStore.update(str, checkpointsForGroup);
            return (List) checkpointsForGroup.values().stream().map(checkpoint -> {
                return checkpointRecord(checkpoint, currentTimeMillis);
            }).collect(Collectors.toList());
        } catch (ExecutionException e) {
            log.error("Error querying offsets for consumer group {} on cluster {}.", new Object[]{str, this.sourceClusterAlias, e});
            return Collections.emptyList();
        }
    }

    Map<TopicPartition, Checkpoint> checkpointsForGroup(Map<TopicPartition, OffsetAndMetadata> map, String str) {
        return (Map) map.entrySet().stream().filter(entry -> {
            return shouldCheckpointTopic(((TopicPartition) entry.getKey()).topic());
        }).map(entry2 -> {
            return checkpoint(str, (TopicPartition) entry2.getKey(), (OffsetAndMetadata) entry2.getValue());
        }).flatMap(optional -> {
            return (Stream) optional.map((v0) -> {
                return Stream.of(v0);
            }).orElseGet(Stream::empty);
        }).filter(checkpoint -> {
            return checkpoint.downstreamOffset() >= 0;
        }).filter(this::checkpointIsMoreRecent).collect(Collectors.toMap((v0) -> {
            return v0.topicPartition();
        }, Function.identity()));
    }

    private boolean checkpointIsMoreRecent(Checkpoint checkpoint) {
        Map<TopicPartition, Checkpoint> map = this.checkpointStore.get(checkpoint.consumerGroupId());
        if (map == null) {
            log.trace("Emitting {} (first for this group)", checkpoint);
            return true;
        }
        Checkpoint checkpoint2 = map.get(checkpoint.topicPartition());
        if (checkpoint2 == null) {
            log.trace("Emitting {} (first for this partition)", checkpoint);
            return true;
        }
        if (checkpoint.upstreamOffset() < checkpoint2.upstreamOffset()) {
            log.trace("Emitting {} (upstream offset rewind)", checkpoint);
            return true;
        }
        if (checkpoint.downstreamOffset() > checkpoint2.downstreamOffset()) {
            log.trace("Emitting {} (downstream offset advanced)", checkpoint);
            return true;
        }
        if (checkpoint.downstreamOffset() != checkpoint2.downstreamOffset()) {
            log.trace("Skipping {} (preventing downstream rewind)", checkpoint);
            return false;
        }
        log.trace("Skipping {} (repeated checkpoint)", checkpoint);
        return false;
    }

    private Map<TopicPartition, OffsetAndMetadata> listConsumerGroupOffsets(String str) throws InterruptedException, ExecutionException {
        return this.stopping ? Collections.emptyMap() : (Map) this.sourceAdminClient.listConsumerGroupOffsets(str).partitionsToOffsetAndMetadata().get();
    }

    Optional<Checkpoint> checkpoint(String str, TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
        if (offsetAndMetadata != null) {
            long offset = offsetAndMetadata.offset();
            OptionalLong translateDownstream = this.offsetSyncStore.translateDownstream(str, topicPartition, offset);
            if (translateDownstream.isPresent()) {
                return Optional.of(new Checkpoint(str, renameTopicPartition(topicPartition), offset, translateDownstream.getAsLong(), offsetAndMetadata.metadata()));
            }
        }
        return Optional.empty();
    }

    SourceRecord checkpointRecord(Checkpoint checkpoint, long j) {
        return new SourceRecord(checkpoint.connectPartition(), MirrorUtils.wrapOffset(0L), this.checkpointsTopic, 0, Schema.BYTES_SCHEMA, checkpoint.recordKey(), Schema.BYTES_SCHEMA, checkpoint.recordValue(), Long.valueOf(j));
    }

    TopicPartition renameTopicPartition(TopicPartition topicPartition) {
        return this.targetClusterAlias.equals(this.replicationPolicy.topicSource(topicPartition.topic())) ? new TopicPartition(this.replicationPolicy.originalTopic(topicPartition.topic()), topicPartition.partition()) : new TopicPartition(this.replicationPolicy.formatRemoteTopic(this.sourceClusterAlias, topicPartition.topic()), topicPartition.partition());
    }

    boolean shouldCheckpointTopic(String str) {
        return this.topicFilter.shouldReplicateTopic(str);
    }

    public void commitRecord(SourceRecord sourceRecord, RecordMetadata recordMetadata) {
        this.metrics.checkpointLatency(MirrorUtils.unwrapPartition(sourceRecord.sourcePartition()), Checkpoint.unwrapGroup(sourceRecord.sourcePartition()), System.currentTimeMillis() - sourceRecord.timestamp().longValue());
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void refreshIdleConsumerGroupOffset() {
        Map describedGroups = this.targetAdminClient.describeConsumerGroups(this.consumerGroups).describedGroups();
        for (String str : this.consumerGroups) {
            try {
                if (((ConsumerGroupDescription) ((KafkaFuture) describedGroups.get(str)).get()).state() == ConsumerGroupState.EMPTY) {
                    this.idleConsumerGroupsOffset.put(str, this.targetAdminClient.listConsumerGroupOffsets(str).partitionsToOffsetAndMetadata().get());
                }
            } catch (InterruptedException | ExecutionException e) {
                log.error("Error querying for consumer group {} on cluster {}.", new Object[]{str, this.targetClusterAlias, e});
            }
        }
    }

    Map<String, Map<TopicPartition, OffsetAndMetadata>> syncGroupOffset() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Map<TopicPartition, OffsetAndMetadata>> entry : this.checkpointStore.computeConvertedUpstreamOffset().entrySet()) {
            String key = entry.getKey();
            Map<TopicPartition, OffsetAndMetadata> value = entry.getValue();
            HashMap hashMap2 = new HashMap();
            Map<TopicPartition, OffsetAndMetadata> map = this.idleConsumerGroupsOffset.get(key);
            if (map == null) {
                syncGroupOffset(key, value);
                hashMap.put(key, value);
            } else {
                Iterator<Map.Entry<TopicPartition, OffsetAndMetadata>> it = value.entrySet().iterator();
                while (it.hasNext()) {
                    TopicPartition key2 = it.next().getKey();
                    OffsetAndMetadata offsetAndMetadata = value.get(key2);
                    if (map.containsKey(key2)) {
                        OffsetAndMetadata offsetAndMetadata2 = map.get(key2);
                        if (offsetAndMetadata2 != null) {
                            long offset = offsetAndMetadata2.offset();
                            if (offset >= offsetAndMetadata.offset()) {
                                log.trace("latestDownstreamOffset {} is larger than or equal to convertedUpstreamOffset {} for TopicPartition {}", new Object[]{Long.valueOf(offset), Long.valueOf(offsetAndMetadata.offset()), key2});
                            }
                        } else {
                            log.warn("Group {} offset for partition {} may has been reset to a negative offset, just sync the offset to target.", key, key2);
                        }
                        hashMap2.put(key2, offsetAndMetadata);
                    } else {
                        hashMap2.put(key2, offsetAndMetadata);
                    }
                }
                if (hashMap2.size() == 0) {
                    log.trace("skip syncing the offset for consumer group: {}", key);
                } else {
                    syncGroupOffset(key, hashMap2);
                    hashMap.put(key, hashMap2);
                }
            }
        }
        this.idleConsumerGroupsOffset.clear();
        return hashMap;
    }

    void syncGroupOffset(String str, Map<TopicPartition, OffsetAndMetadata> map) {
        if (this.targetAdminClient != null) {
            this.targetAdminClient.alterConsumerGroupOffsets(str, map).all().whenComplete((r7, th) -> {
                if (th == null) {
                    log.trace("Sync-ed {} offsets for consumer group {}.", Integer.valueOf(map.size()), str);
                } else if (th.getCause() instanceof UnknownMemberIdException) {
                    log.warn("Unable to sync offsets for consumer group {}. This is likely caused by consumers currently using this group in the target cluster.", str);
                } else {
                    log.error("Unable to sync offsets for consumer group {}.", str, th);
                }
            });
        }
    }
}
