package io.continual.services.messaging.impl.kafka;

import io.continual.messaging.ContinualMessage;
import io.continual.messaging.ContinualMessagePublisher;
import io.continual.messaging.ContinualMessageSink;
import io.continual.messaging.ContinualMessageStream;
import io.continual.messaging.MessagePublishException;
import io.continual.services.ServiceContainer;
import java.io.IOException;
import java.util.Collection;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/continual/services/messaging/impl/kafka/KafkaPublisher.class */
public class KafkaPublisher implements ContinualMessagePublisher {
    private final KafkaProducer<String, String> fProducer;
    private static final Logger log = LoggerFactory.getLogger(KafkaPublisher.class);

    public KafkaPublisher(ServiceContainer serviceContainer, JSONObject jSONObject) throws IOException {
        Properties properties = new Properties();
        transfer(jSONObject, properties, "bootstrap.servers");
        transfer(jSONObject, properties, "acks", "all");
        transfer(jSONObject, properties, "retries", 0);
        transfer(jSONObject, properties, "batch.size", 16384);
        transfer(jSONObject, properties, "linger.ms", 1);
        transfer(jSONObject, properties, "buffer.memory", 33554432);
        transfer(jSONObject, properties, "key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        transfer(jSONObject, properties, "value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.fProducer = new KafkaProducer<>(properties);
    }

    public ContinualMessageSink getTopic(final String str) throws ContinualMessagePublisher.TopicUnavailableException {
        return new ContinualMessageSink() { // from class: io.continual.services.messaging.impl.kafka.KafkaPublisher.1
            public void send(ContinualMessageStream continualMessageStream, Collection<ContinualMessage> collection) throws MessagePublishException {
                for (ContinualMessage continualMessage : collection) {
                    String name = continualMessageStream.getName();
                    String jSONObject = continualMessage.getMessagePayload().toString();
                    KafkaPublisher.log.debug("To Kafka (" + str + " / " + name + "): " + jSONObject);
                    KafkaPublisher.this.fProducer.send(new ProducerRecord(str.toString(), name, jSONObject));
                }
            }
        };
    }

    public void flush() {
    }

    public void close() throws IOException {
        this.fProducer.close();
    }

    private static void transfer(JSONObject jSONObject, Properties properties, String str) {
        transfer(jSONObject, properties, str, (String) null);
    }

    private static void transfer(JSONObject jSONObject, Properties properties, String str, String str2) {
        transfer(jSONObject, "kafka." + str, properties, str, str2);
    }

    private static void transfer(JSONObject jSONObject, Properties properties, String str, int i) {
        transfer(jSONObject, "kafka." + str, properties, str, Integer.toString(i));
    }

    private static void transfer(JSONObject jSONObject, String str, Properties properties, String str2, String str3) {
        String optString = jSONObject.optString(str, str3);
        if (optString != null) {
            log.info("kafka: " + str2 + "=" + optString);
            properties.put(str2, optString);
        }
    }
}
