package io.strimzi.kafka.bridge;

import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.config.KafkaConfig;
import io.strimzi.kafka.bridge.http.HttpBridgeEndpoint;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/strimzi/kafka/bridge/SinkBridgeEndpoint.class */
public abstract class SinkBridgeEndpoint<K, V> implements HttpBridgeEndpoint {
    protected String name;
    protected final EmbeddedFormat format;
    protected final Deserializer<K> keyDeserializer;
    protected final Deserializer<V> valueDeserializer;
    protected final BridgeConfig bridgeConfig;
    private Handler<HttpBridgeEndpoint> closeHandler;
    private Consumer<K, V> consumer;
    protected ConsumerInstanceId consumerInstanceId;
    protected String groupId;
    protected final Logger log = LoggerFactory.getLogger(getClass());
    protected long pollTimeOut = 100;
    protected long maxBytes = Long.MAX_VALUE;
    private final ConsumerRebalanceListener loggingPartitionsRebalance = new LoggingPartitionsRebalance();
    protected List<SinkTopicSubscription> topicSubscriptions = new ArrayList();
    protected Pattern topicSubscriptionsPattern = null;
    protected boolean subscribed = false;
    protected boolean assigned = false;

    public SinkBridgeEndpoint(BridgeConfig bridgeConfig, EmbeddedFormat embeddedFormat, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        this.bridgeConfig = bridgeConfig;
        this.format = embeddedFormat;
        this.keyDeserializer = deserializer;
        this.valueDeserializer = deserializer2;
    }

    @Override // io.strimzi.kafka.bridge.http.HttpBridgeEndpoint
    public String name() {
        return this.name;
    }

    @Override // io.strimzi.kafka.bridge.http.HttpBridgeEndpoint
    public HttpBridgeEndpoint closeHandler(Handler<HttpBridgeEndpoint> handler) {
        this.closeHandler = handler;
        return this;
    }

    @Override // io.strimzi.kafka.bridge.http.HttpBridgeEndpoint
    public void close() {
        if (this.consumer != null) {
            this.consumer.close();
        }
        handleClose();
    }

    public ConsumerInstanceId consumerInstanceId() {
        return this.consumerInstanceId;
    }

    protected void handleClose() {
        if (this.closeHandler != null) {
            this.closeHandler.handle(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initConsumer(Properties properties) {
        KafkaConfig kafkaConfig = this.bridgeConfig.getKafkaConfig();
        Properties properties2 = new Properties();
        properties2.putAll(kafkaConfig.getConfig());
        properties2.putAll(kafkaConfig.getConsumerConfig().getConfig());
        properties2.put("group.id", this.groupId);
        if (properties != null) {
            properties2.putAll(properties);
        }
        this.consumer = new KafkaConsumer(properties2, this.keyDeserializer, this.valueDeserializer);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void subscribe() {
        if (this.topicSubscriptions.isEmpty()) {
            throw new IllegalArgumentException("At least one topic to subscribe has to be specified!");
        }
        this.log.info("Subscribe to topics {}", this.topicSubscriptions);
        this.subscribed = true;
        Set set = (Set) this.topicSubscriptions.stream().map((v0) -> {
            return v0.getTopic();
        }).collect(Collectors.toSet());
        this.log.trace("Subscribe thread {}", Thread.currentThread());
        this.consumer.subscribe(set, this.loggingPartitionsRebalance);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void unsubscribe() {
        this.log.info("Unsubscribe from topics {}", this.topicSubscriptions);
        this.topicSubscriptions.clear();
        this.topicSubscriptionsPattern = null;
        this.subscribed = false;
        this.assigned = false;
        this.log.trace("Unsubscribe thread {}", Thread.currentThread());
        this.consumer.unsubscribe();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Set<TopicPartition> listSubscriptions() {
        this.log.info("Listing subscribed topics {}", this.topicSubscriptions);
        this.log.trace("ListSubscriptions thread {}", Thread.currentThread());
        return this.consumer.assignment();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void subscribe(Pattern pattern) {
        this.topicSubscriptionsPattern = pattern;
        this.log.info("Subscribe to topics with pattern {}", pattern);
        this.subscribed = true;
        this.log.trace("Subscribe thread {}", Thread.currentThread());
        this.consumer.subscribe(pattern, this.loggingPartitionsRebalance);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assign() {
        if (this.topicSubscriptions.isEmpty()) {
            throw new IllegalArgumentException("At least one topic to subscribe has to be specified!");
        }
        this.log.info("Assigning to topics partitions {}", this.topicSubscriptions);
        this.assigned = true;
        HashSet hashSet = new HashSet();
        for (SinkTopicSubscription sinkTopicSubscription : this.topicSubscriptions) {
            hashSet.add(new TopicPartition(sinkTopicSubscription.getTopic(), sinkTopicSubscription.getPartition().intValue()));
        }
        this.log.trace("Assign thread {}", Thread.currentThread());
        this.consumer.assign(hashSet);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ConsumerRecords<K, V> consume() {
        this.log.trace("Poll thread {}", Thread.currentThread());
        return this.consumer.poll(Duration.ofMillis(this.pollTimeOut));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<TopicPartition, OffsetAndMetadata> commit(Map<TopicPartition, OffsetAndMetadata> map) {
        this.log.trace("Commit thread {}", Thread.currentThread());
        this.consumer.commitSync(map);
        return map;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void commitLastPolledOffsets() {
        this.log.trace("Commit thread {}", Thread.currentThread());
        this.consumer.commitSync();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void seek(TopicPartition topicPartition, long j) {
        this.log.trace("Seek thread {}", Thread.currentThread());
        this.consumer.seek(topicPartition, j);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void seekToBeginning(Set<TopicPartition> set) {
        this.log.trace("SeekToBeginning thread {}", Thread.currentThread());
        this.consumer.seekToBeginning(set);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void seekToEnd(Set<TopicPartition> set) {
        this.log.trace("SeekToEnd thread {}", Thread.currentThread());
        this.consumer.seekToEnd(set);
    }
}
