package io.strimzi.kafka.bridge;

import io.strimzi.kafka.bridge.config.BridgeConfig;
import io.strimzi.kafka.bridge.config.KafkaConfig;
import io.strimzi.kafka.bridge.tracing.TracingUtil;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.RecordMetadata;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/strimzi/kafka/bridge/SourceBridgeEndpoint.class */
public abstract class SourceBridgeEndpoint<K, V> implements BridgeEndpoint {
    protected final Logger log = LoggerFactory.getLogger(getClass());
    protected String name;
    protected final EmbeddedFormat format;
    protected final Serializer<K> keySerializer;
    protected final Serializer<V> valueSerializer;
    protected final Vertx vertx;
    protected final BridgeConfig bridgeConfig;
    private Handler<BridgeEndpoint> closeHandler;
    private KafkaProducer<K, V> producer;

    public SourceBridgeEndpoint(Vertx vertx, BridgeConfig bridgeConfig, EmbeddedFormat embeddedFormat, Serializer<K> serializer, Serializer<V> serializer2) {
        this.vertx = vertx;
        this.bridgeConfig = bridgeConfig;
        this.format = embeddedFormat;
        this.keySerializer = serializer;
        this.valueSerializer = serializer2;
    }

    @Override // io.strimzi.kafka.bridge.BridgeEndpoint
    public String name() {
        return this.name;
    }

    @Override // io.strimzi.kafka.bridge.BridgeEndpoint
    public BridgeEndpoint closeHandler(Handler<BridgeEndpoint> handler) {
        this.closeHandler = handler;
        return this;
    }

    protected void handleClose() {
        if (this.closeHandler != null) {
            this.closeHandler.handle(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void send(KafkaProducerRecord<K, V> kafkaProducerRecord, Handler<AsyncResult<RecordMetadata>> handler) {
        this.log.debug("Sending record {}", kafkaProducerRecord);
        if (handler == null) {
            this.producer.send(kafkaProducerRecord);
        } else {
            this.producer.send(kafkaProducerRecord, handler);
        }
    }

    @Override // io.strimzi.kafka.bridge.BridgeEndpoint
    public void open() {
        KafkaConfig kafkaConfig = this.bridgeConfig.getKafkaConfig();
        Properties properties = new Properties();
        properties.putAll(kafkaConfig.getConfig());
        properties.putAll(kafkaConfig.getProducerConfig().getConfig());
        TracingUtil.getTracing().addTracingPropsToProducerConfig(properties);
        this.producer = KafkaProducer.create(this.vertx, properties, this.keySerializer, this.valueSerializer);
    }

    @Override // io.strimzi.kafka.bridge.BridgeEndpoint
    public void close() {
        if (this.producer != null) {
            this.producer.close();
        }
        handleClose();
    }
}
