package org.opensearch.migrations.trafficcapture.kafkaoffloader;

import com.google.protobuf.CodedOutputStream;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import lombok.NonNull;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.opensearch.migrations.tracing.commoncontexts.IConnectionContext;
import org.opensearch.migrations.trafficcapture.CodedOutputStreamHolder;
import org.opensearch.migrations.trafficcapture.IChannelConnectionCaptureSerializer;
import org.opensearch.migrations.trafficcapture.IConnectionCaptureFactory;
import org.opensearch.migrations.trafficcapture.OrderedStreamLifecyleManager;
import org.opensearch.migrations.trafficcapture.StreamChannelConnectionCaptureSerializer;
import org.opensearch.migrations.trafficcapture.kafkaoffloader.tracing.IRootKafkaOffloaderContext;
import org.opensearch.migrations.trafficcapture.kafkaoffloader.tracing.KafkaRecordContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.class */
public class KafkaCaptureFactory implements IConnectionCaptureFactory<RecordMetadata> {
    private static final Logger log = LoggerFactory.getLogger(KafkaCaptureFactory.class);
    private static final String DEFAULT_TOPIC_NAME_FOR_TRAFFIC = "logging-traffic-topic";
    public static final int KAFKA_MESSAGE_OVERHEAD_BYTES = 500;
    private final IRootKafkaOffloaderContext rootScope;
    private final String nodeId;
    private final Producer<String, byte[]> producer;
    private final String topicNameForTraffic;
    private final int bufferSize;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory$CodedOutputStreamWrapper.class */
    public static class CodedOutputStreamWrapper implements CodedOutputStreamHolder {
        private final CodedOutputStream codedOutputStream;
        private final ByteBuffer byteBuffer;

        public int getOutputStreamBytesLimit() {
            return this.byteBuffer.limit();
        }

        @NonNull
        public CodedOutputStream getOutputStream() {
            return this.codedOutputStream;
        }

        public CodedOutputStreamWrapper(CodedOutputStream codedOutputStream, ByteBuffer byteBuffer) {
            this.codedOutputStream = codedOutputStream;
            this.byteBuffer = byteBuffer;
        }
    }

    /* loaded from: input_file:org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory$StreamManager.class */
    class StreamManager extends OrderedStreamLifecyleManager<RecordMetadata> {
        IConnectionContext telemetryContext;
        IRootKafkaOffloaderContext rootScope;
        Instant startTime = Instant.now();

        public StreamManager(IRootKafkaOffloaderContext iRootKafkaOffloaderContext, IConnectionContext iConnectionContext) {
            this.rootScope = iRootKafkaOffloaderContext;
            this.telemetryContext = iConnectionContext;
        }

        /* renamed from: createStream, reason: merged with bridge method [inline-methods] */
        public CodedOutputStreamWrapper m1createStream() {
            this.telemetryContext.getCurrentSpan().addEvent("streamCreated");
            ByteBuffer allocate = ByteBuffer.allocate(KafkaCaptureFactory.this.bufferSize);
            return new CodedOutputStreamWrapper(CodedOutputStream.newInstance(allocate), allocate);
        }

        public CompletableFuture<RecordMetadata> kickoffCloseStream(CodedOutputStreamHolder codedOutputStreamHolder, int i) {
            if (!(codedOutputStreamHolder instanceof CodedOutputStreamWrapper)) {
                throw new IllegalArgumentException("Unknown outputStreamHolder sent back to StreamManager: " + codedOutputStreamHolder);
            }
            String format = String.format("%s.%d", this.telemetryContext.getConnectionId(), Integer.valueOf(i));
            ByteBuffer byteBuffer = ((CodedOutputStreamWrapper) codedOutputStreamHolder).byteBuffer;
            ProducerRecord producerRecord = new ProducerRecord(KafkaCaptureFactory.this.topicNameForTraffic, format, Arrays.copyOfRange(byteBuffer.array(), 0, byteBuffer.position()));
            KafkaCaptureFactory.log.debug("Sending Kafka producer record: {} for topic: {}", format, KafkaCaptureFactory.this.topicNameForTraffic);
            KafkaRecordContext createKafkaRecordContext = this.rootScope.createKafkaRecordContext(this.telemetryContext, KafkaCaptureFactory.this.topicNameForTraffic, format, ((byte[]) producerRecord.value()).length);
            return KafkaCaptureFactory.sendFullyAsync(KafkaCaptureFactory.this.producer, producerRecord).whenComplete((recordMetadata, th) -> {
                if (th != null) {
                    createKafkaRecordContext.addException(th, true);
                    KafkaCaptureFactory.log.error("Error sending producer record: {}", format, th);
                } else {
                    KafkaCaptureFactory.log.debug("Kafka producer record: {} has finished sending for topic: {} and partition {}", new Object[]{format, recordMetadata.topic(), Integer.valueOf(recordMetadata.partition())});
                }
                createKafkaRecordContext.close();
            });
        }
    }

    public KafkaCaptureFactory(IRootKafkaOffloaderContext iRootKafkaOffloaderContext, String str, Producer<String, byte[]> producer, String str2, int i) {
        this.rootScope = iRootKafkaOffloaderContext;
        this.nodeId = str;
        this.producer = producer;
        this.topicNameForTraffic = str2;
        this.bufferSize = i - KAFKA_MESSAGE_OVERHEAD_BYTES;
    }

    public KafkaCaptureFactory(IRootKafkaOffloaderContext iRootKafkaOffloaderContext, String str, Producer<String, byte[]> producer, int i) {
        this(iRootKafkaOffloaderContext, str, producer, DEFAULT_TOPIC_NAME_FOR_TRAFFIC, i);
    }

    public IChannelConnectionCaptureSerializer<RecordMetadata> createOffloader(IConnectionContext iConnectionContext) {
        return new StreamChannelConnectionCaptureSerializer(this.nodeId, iConnectionContext.getConnectionId(), new StreamManager(this.rootScope, iConnectionContext));
    }

    public static <K, V> CompletableFuture<RecordMetadata> sendFullyAsync(Producer<K, V> producer, ProducerRecord<K, V> producerRecord) {
        CompletableFuture<RecordMetadata> completableFuture = new CompletableFuture<>();
        ForkJoinPool.commonPool().execute(() -> {
            try {
                producer.send(producerRecord, (recordMetadata, exc) -> {
                    if (exc != null) {
                        completableFuture.completeExceptionally(exc);
                    } else {
                        completableFuture.complete(recordMetadata);
                    }
                });
            } catch (Exception e) {
                completableFuture.completeExceptionally(e);
            }
        });
        return completableFuture;
    }
}
