package io.strimzi.kafka.bridge;

import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.config.KafkaConfig;
import io.strimzi.kafka.bridge.http.HttpBridgeEndpoint;
import io.strimzi.kafka.bridge.tracing.TracingUtil;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/strimzi/kafka/bridge/SourceBridgeEndpoint.class */
public abstract class SourceBridgeEndpoint<K, V> implements HttpBridgeEndpoint {
    protected final Logger log = LoggerFactory.getLogger(getClass());
    protected String name;
    protected final EmbeddedFormat format;
    protected final Serializer<K> keySerializer;
    protected final Serializer<V> valueSerializer;
    protected final BridgeConfig bridgeConfig;
    private Handler<HttpBridgeEndpoint> closeHandler;
    private Producer<K, V> producer;

    public SourceBridgeEndpoint(BridgeConfig bridgeConfig, EmbeddedFormat embeddedFormat, Serializer<K> serializer, Serializer<V> serializer2) {
        this.bridgeConfig = bridgeConfig;
        this.format = embeddedFormat;
        this.keySerializer = serializer;
        this.valueSerializer = serializer2;
    }

    @Override // io.strimzi.kafka.bridge.http.HttpBridgeEndpoint
    public String name() {
        return this.name;
    }

    @Override // io.strimzi.kafka.bridge.http.HttpBridgeEndpoint
    public HttpBridgeEndpoint closeHandler(Handler<HttpBridgeEndpoint> handler) {
        this.closeHandler = handler;
        return this;
    }

    protected void handleClose() {
        if (this.closeHandler != null) {
            this.closeHandler.handle(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletionStage<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
        CompletableFuture completableFuture = new CompletableFuture();
        this.log.trace("Send thread {}", Thread.currentThread());
        this.log.debug("Sending record {}", producerRecord);
        this.producer.send(producerRecord, (recordMetadata, exc) -> {
            this.log.trace("Kafka client callback thread {}", Thread.currentThread());
            this.log.debug("Sent record {} at offset {}", producerRecord, Long.valueOf(recordMetadata.offset()));
            if (exc == null) {
                completableFuture.complete(recordMetadata);
            } else {
                completableFuture.completeExceptionally(exc);
            }
        });
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendIgnoreResult(ProducerRecord<K, V> producerRecord) {
        this.log.trace("Send ignore result thread {}", Thread.currentThread());
        this.log.debug("Sending record {}", producerRecord);
        this.producer.send(producerRecord);
    }

    @Override // io.strimzi.kafka.bridge.http.HttpBridgeEndpoint
    public void open() {
        KafkaConfig kafkaConfig = this.bridgeConfig.getKafkaConfig();
        Properties properties = new Properties();
        properties.putAll(kafkaConfig.getConfig());
        properties.putAll(kafkaConfig.getProducerConfig().getConfig());
        TracingUtil.getTracing().addTracingPropsToProducerConfig(properties);
        this.producer = new KafkaProducer(properties, this.keySerializer, this.valueSerializer);
    }

    @Override // io.strimzi.kafka.bridge.http.HttpBridgeEndpoint
    public void close() {
        if (this.producer != null) {
            this.producer.close();
        }
        handleClose();
    }
}
