package jp.ad.sinet.stream.plugins.kafka;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.logging.Logger;
import jp.ad.sinet.stream.api.Consistency;
import jp.ad.sinet.stream.spi.WriterParameters;
import jp.ad.sinet.stream.utils.MessageUtils;
import lombok.Generated;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Serdes;

/* loaded from: input_file:jp/ad/sinet/stream/plugins/kafka/KafkaBaseWriter.class */
public class KafkaBaseWriter extends KafkaBaseIO {

    @Generated
    private static final Logger log = Logger.getLogger(KafkaBaseWriter.class.getName());
    protected KafkaProducer<String, byte[]> producer;
    protected final String topic;
    protected final AtomicInteger inTransaction;
    private static final Map<String, Function<Object, Object>> PRODUCER_PARAMETER_NAMES_MAP;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: jp.ad.sinet.stream.plugins.kafka.KafkaBaseWriter$1, reason: invalid class name */
    /* loaded from: input_file:jp/ad/sinet/stream/plugins/kafka/KafkaBaseWriter$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$jp$ad$sinet$stream$api$Consistency = new int[Consistency.values().length];

        static {
            try {
                $SwitchMap$jp$ad$sinet$stream$api$Consistency[Consistency.EXACTLY_ONCE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$jp$ad$sinet$stream$api$Consistency[Consistency.AT_LEAST_ONCE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$jp$ad$sinet$stream$api$Consistency[Consistency.AT_MOST_ONCE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaBaseWriter(WriterParameters writerParameters) {
        super(writerParameters.getService(), writerParameters.getConsistency(), writerParameters.getClientId(), writerParameters.getConfig(), writerParameters.getValueType(), writerParameters.isDataEncryption());
        this.inTransaction = new AtomicInteger(0);
        this.topic = writerParameters.getTopic();
        Properties kafkaProperties = getKafkaProperties();
        setupProducerProperties(writerParameters, kafkaProperties);
        try {
            setupProducer(kafkaProperties);
        } catch (Throwable th) {
            throw wrapSinetStreamException(th);
        }
    }

    private void setupProducerProperties(WriterParameters writerParameters, Properties properties) {
        PRODUCER_PARAMETER_NAMES_MAP.forEach((str, function) -> {
            updateProperty(properties, str, function);
        });
        setupConsistencyProperties(properties);
        setupSSLProperties(writerParameters.getConfig(), properties);
        setupSASLProperties(writerParameters.getConfig(), properties);
    }

    private void setupConsistencyProperties(Properties properties) {
        switch (AnonymousClass1.$SwitchMap$jp$ad$sinet$stream$api$Consistency[this.consistency.ordinal()]) {
            case 1:
                properties.put("enable.idempotence", "true");
                setupTransactionId(properties);
                return;
            case 2:
                properties.put("acks", "all");
                return;
            case 3:
                properties.put("acks", "0");
                return;
            default:
                return;
        }
    }

    private void setupProducer(Properties properties) {
        log.fine(() -> {
            return "KAFKA producer init: " + getClientId();
        });
        this.producer = new KafkaProducer<>(properties, Serdes.String().serializer(), Serdes.ByteArray().serializer());
        if (getConsistency().equals(Consistency.EXACTLY_ONCE)) {
            initTransaction();
        }
    }

    public void close() {
        log.fine(() -> {
            return "KAFKA producer close: " + getClientId();
        });
        this.producer.close();
    }

    public synchronized void initTransaction() {
        this.producer.initTransactions();
        log.finer(() -> {
            return "KAFKA init transaction: " + getClientId();
        });
    }

    public synchronized void beginTransaction() {
        if (this.inTransaction.getAndIncrement() == 0) {
            this.producer.beginTransaction();
            log.finer(() -> {
                return "KAFKA begin transaction: " + getClientId();
            });
        }
    }

    public synchronized void commitTransaction() {
        if (this.inTransaction.decrementAndGet() <= 0) {
            this.producer.commitTransaction();
            log.finer(() -> {
                return "KAFKA commit transaction: " + getClientId();
            });
        }
    }

    public synchronized void abortTransaction() {
        if (this.inTransaction.decrementAndGet() <= 0) {
            this.producer.abortTransaction();
            log.finer(() -> {
                return "KAFKA abort transaction: " + getClientId();
            });
        }
    }

    public Object getMetrics() {
        return this.producer.metrics();
    }

    public void resetMetrics() {
    }

    @Generated
    public String getTopic() {
        return this.topic;
    }

    static {
        HashMap hashMap = new HashMap();
        hashMap.put("acks", MessageUtils::toString);
        hashMap.put("compression.type", MessageUtils::toString);
        hashMap.put("retries", MessageUtils::toInteger);
        hashMap.put("batch.size", MessageUtils::toInteger);
        hashMap.put("linger.ms", MessageUtils::toLong);
        hashMap.put("buffer.memory", MessageUtils::toLong);
        hashMap.put("max.block.ms", MessageUtils::toLong);
        hashMap.put("max.request.size", MessageUtils::toInteger);
        hashMap.put("max.in.flight.requests.per.connection", MessageUtils::toInteger);
        hashMap.put("delivery.timeout.ms", MessageUtils::toInteger);
        hashMap.put("enable.idempotence", MessageUtils::toBoolean);
        hashMap.put("transaction.timeout.ms", MessageUtils::toInteger);
        hashMap.put("transactional.id", MessageUtils::toString);
        PRODUCER_PARAMETER_NAMES_MAP = Collections.unmodifiableMap(hashMap);
    }
}
