package io.continual.services.messaging.impl.kafka;

import io.continual.builder.Builder;
import io.continual.messaging.ContinualMessage;
import io.continual.messaging.ContinualMessagePublisher;
import io.continual.messaging.ContinualMessageSink;
import io.continual.messaging.ContinualMessageStream;
import io.continual.messaging.MessagePublishException;
import io.continual.services.ServiceContainer;
import io.continual.services.SimpleService;
import io.continual.util.data.json.JsonVisitor;
import java.io.IOException;
import java.util.Collection;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/continual/services/messaging/impl/kafka/KafkaPublisher.class */
public class KafkaPublisher extends SimpleService implements ContinualMessagePublisher {
    private final KafkaProducer<String, String> fProducer;
    private static final Logger log = LoggerFactory.getLogger(KafkaPublisher.class);

    public KafkaPublisher(ServiceContainer serviceContainer, JSONObject jSONObject) throws Builder.BuildFailure {
        JSONObject evaluateJsonObject = serviceContainer.getExprEval().evaluateJsonObject(jSONObject);
        final Properties properties = new Properties();
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        JsonVisitor.forEachElement(evaluateJsonObject.optJSONObject("kafka"), new JsonVisitor.ObjectVisitor<Object, Builder.BuildFailure>(this) { // from class: io.continual.services.messaging.impl.kafka.KafkaPublisher.1
            final /* synthetic */ KafkaPublisher this$0;

            {
                this.this$0 = this;
            }

            public boolean visit(String str, Object obj) throws JSONException, Builder.BuildFailure {
                properties.put(str, obj.toString());
                return true;
            }
        });
        this.fProducer = new KafkaProducer<>(properties);
    }

    public ContinualMessageSink getTopic(final String str) throws ContinualMessagePublisher.TopicUnavailableException {
        return new ContinualMessageSink(this) { // from class: io.continual.services.messaging.impl.kafka.KafkaPublisher.2
            final /* synthetic */ KafkaPublisher this$0;

            {
                this.this$0 = this;
            }

            public void send(ContinualMessageStream continualMessageStream, Collection<ContinualMessage> collection) throws MessagePublishException {
                for (ContinualMessage continualMessage : collection) {
                    String name = continualMessageStream.getName();
                    String jSONObject = continualMessage.toJson().toString();
                    KafkaPublisher.log.info("To Kafka (" + str + " / " + name + "): " + jSONObject);
                    this.this$0.fProducer.send(new ProducerRecord(str.toString(), name, jSONObject));
                }
                this.this$0.fProducer.flush();
            }
        };
    }

    public void flush() {
    }

    public void close() throws IOException {
        this.fProducer.close();
    }
}
