package io.strimzi.kafka.bridge;

import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.config.KafkaConfig;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecords;
import io.vertx.kafka.client.consumer.OffsetAndMetadata;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/strimzi/kafka/bridge/SinkBridgeEndpoint.class */
public abstract class SinkBridgeEndpoint<K, V> implements BridgeEndpoint {
    protected String name;
    protected final EmbeddedFormat format;
    protected final Deserializer<K> keyDeserializer;
    protected final Deserializer<V> valueDeserializer;
    protected final Vertx vertx;
    protected final BridgeConfig bridgeConfig;
    private Handler<BridgeEndpoint> closeHandler;
    private KafkaConsumer<K, V> consumer;
    protected ConsumerInstanceId consumerInstanceId;
    protected String groupId;
    protected final Logger log = LoggerFactory.getLogger(getClass());
    protected long pollTimeOut = 100;
    protected long maxBytes = Long.MAX_VALUE;
    private PartitionsAssignmentHandle partitionsAssignmentHandle = new NoopPartitionsAssignmentHandle();
    protected List<SinkTopicSubscription> topicSubscriptions = new ArrayList();
    protected Pattern topicSubscriptionsPattern = null;
    protected boolean subscribed = false;
    protected boolean assigned = false;

    public SinkBridgeEndpoint(Vertx vertx, BridgeConfig bridgeConfig, EmbeddedFormat embeddedFormat, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        this.vertx = vertx;
        this.bridgeConfig = bridgeConfig;
        this.format = embeddedFormat;
        this.keyDeserializer = deserializer;
        this.valueDeserializer = deserializer2;
    }

    @Override // io.strimzi.kafka.bridge.BridgeEndpoint
    public String name() {
        return this.name;
    }

    @Override // io.strimzi.kafka.bridge.BridgeEndpoint
    public BridgeEndpoint closeHandler(Handler<BridgeEndpoint> handler) {
        this.closeHandler = handler;
        return this;
    }

    @Override // io.strimzi.kafka.bridge.BridgeEndpoint
    public void close() {
        if (this.consumer != null) {
            this.consumer.close();
        }
        handleClose();
    }

    public ConsumerInstanceId consumerInstanceId() {
        return this.consumerInstanceId;
    }

    protected void handleClose() {
        if (this.closeHandler != null) {
            this.closeHandler.handle(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initConsumer(Properties properties) {
        KafkaConfig kafkaConfig = this.bridgeConfig.getKafkaConfig();
        Properties properties2 = new Properties();
        properties2.putAll(kafkaConfig.getConfig());
        properties2.putAll(kafkaConfig.getConsumerConfig().getConfig());
        properties2.put("group.id", this.groupId);
        if (properties != null) {
            properties2.putAll(properties);
        }
        this.consumer = KafkaConsumer.create(this.vertx, properties2, this.keyDeserializer, this.valueDeserializer);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void subscribe(Handler<AsyncResult<Void>> handler) {
        if (this.topicSubscriptions.isEmpty()) {
            throw new IllegalArgumentException("At least one topic to subscribe has to be specified!");
        }
        this.log.info("Subscribe to topics {}", this.topicSubscriptions);
        this.subscribed = true;
        setPartitionsAssignmentHandlers();
        this.consumer.subscribe((Set) this.topicSubscriptions.stream().map((v0) -> {
            return v0.getTopic();
        }).collect(Collectors.toSet()), asyncResult -> {
            if (handler != null) {
                handler.handle(asyncResult);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void unsubscribe(Handler<AsyncResult<Void>> handler) {
        this.log.info("Unsubscribe from topics {}", this.topicSubscriptions);
        this.topicSubscriptions.clear();
        this.topicSubscriptionsPattern = null;
        this.subscribed = false;
        this.assigned = false;
        this.consumer.unsubscribe(asyncResult -> {
            if (handler != null) {
                handler.handle(asyncResult);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void listSubscriptions(Handler<AsyncResult<Set<TopicPartition>>> handler) {
        this.log.info("Listing subscribed topics {}", this.topicSubscriptions);
        this.consumer.assignment(handler);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void subscribe(Pattern pattern, Handler<AsyncResult<Void>> handler) {
        this.topicSubscriptionsPattern = pattern;
        this.log.info("Subscribe to topics with pattern {}", pattern);
        setPartitionsAssignmentHandlers();
        this.subscribed = true;
        this.consumer.subscribe(pattern, asyncResult -> {
            if (handler != null) {
                handler.handle(asyncResult);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assign(Handler<AsyncResult<Void>> handler) {
        if (this.topicSubscriptions.isEmpty()) {
            throw new IllegalArgumentException("At least one topic to subscribe has to be specified!");
        }
        this.log.info("Assigning to topics partitions {}", this.topicSubscriptions);
        this.assigned = true;
        HashSet hashSet = new HashSet();
        for (SinkTopicSubscription sinkTopicSubscription : this.topicSubscriptions) {
            hashSet.add(new TopicPartition(sinkTopicSubscription.getTopic(), sinkTopicSubscription.getPartition().intValue()));
        }
        this.consumer.assign(hashSet, asyncResult -> {
            if (handler != null) {
                handler.handle(asyncResult);
            }
            if (asyncResult.failed()) {
                return;
            }
            this.log.debug("Assigned to topic partitions {}", hashSet);
        });
    }

    private void setPartitionsAssignmentHandlers() {
        this.consumer.partitionsRevokedHandler(set -> {
            this.log.debug("Partitions revoked {}", Integer.valueOf(set.size()));
            if (this.log.isDebugEnabled() && !set.isEmpty()) {
                Iterator it = set.iterator();
                while (it.hasNext()) {
                    TopicPartition topicPartition = (TopicPartition) it.next();
                    this.log.debug("topic {} partition {}", topicPartition.getTopic(), Integer.valueOf(topicPartition.getPartition()));
                }
            }
            if (this.partitionsAssignmentHandle != null) {
                this.partitionsAssignmentHandle.handleRevokedPartitions(set);
            }
        });
        this.consumer.partitionsAssignedHandler(set2 -> {
            this.log.debug("Partitions assigned {}", Integer.valueOf(set2.size()));
            if (this.log.isDebugEnabled() && !set2.isEmpty()) {
                Iterator it = set2.iterator();
                while (it.hasNext()) {
                    TopicPartition topicPartition = (TopicPartition) it.next();
                    this.log.debug("topic {} partition {}", topicPartition.getTopic(), Integer.valueOf(topicPartition.getPartition()));
                }
            }
            if (this.partitionsAssignmentHandle != null) {
                this.partitionsAssignmentHandle.handleAssignedPartitions(set2);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void consume(Handler<AsyncResult<KafkaConsumerRecords<K, V>>> handler) {
        this.consumer.poll(Duration.ofMillis(this.pollTimeOut), handler);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void commit(Map<TopicPartition, OffsetAndMetadata> map, Handler<AsyncResult<Map<TopicPartition, OffsetAndMetadata>>> handler) {
        this.consumer.commit(map, handler);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void commit(Handler<AsyncResult<Void>> handler) {
        this.consumer.commit(handler);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void seek(TopicPartition topicPartition, long j, Handler<AsyncResult<Void>> handler) {
        this.consumer.seek(topicPartition, j, asyncResult -> {
            if (handler != null) {
                handler.handle(asyncResult);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void seekToBeginning(Set<TopicPartition> set, Handler<AsyncResult<Void>> handler) {
        this.consumer.seekToBeginning(set, asyncResult -> {
            if (handler != null) {
                handler.handle(asyncResult);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void seekToEnd(Set<TopicPartition> set, Handler<AsyncResult<Void>> handler) {
        this.consumer.seekToEnd(set, asyncResult -> {
            if (handler != null) {
                handler.handle(asyncResult);
            }
        });
    }
}
