package org.opensearch.migrations.replay.kafka;

import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.opensearch.migrations.replay.datatypes.ITrafficStreamKey;
import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamAndKey;
import org.opensearch.migrations.replay.tracing.ChannelContextManager;
import org.opensearch.migrations.replay.tracing.IReplayContexts;
import org.opensearch.migrations.replay.tracing.ITrafficSourceContexts;
import org.opensearch.migrations.replay.tracing.ReplayContexts;
import org.opensearch.migrations.replay.tracing.RootReplayerContext;
import org.opensearch.migrations.replay.traffic.source.ISimpleTrafficCaptureSource;
import org.opensearch.migrations.replay.traffic.source.ITrafficCaptureSource;
import org.opensearch.migrations.replay.traffic.source.ITrafficStreamWithKey;
import org.opensearch.migrations.tracing.IScopedInstrumentationAttributes;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSource.class */
public class KafkaTrafficCaptureSource implements ISimpleTrafficCaptureSource {
    private static final Logger log = LoggerFactory.getLogger(KafkaTrafficCaptureSource.class);
    public static final String MAX_POLL_INTERVAL_KEY = "max.poll.interval.ms";
    public static final String DEFAULT_POLL_INTERVAL_MS = "60000";
    final TrackingKafkaConsumer trackingKafkaConsumer;
    private final ExecutorService kafkaExecutor;
    private final AtomicLong trafficStreamsRead;
    private final KafkaBehavioralPolicy behavioralPolicy;
    private final ChannelContextManager channelContextManager;
    private final AtomicBoolean isClosed;

    public KafkaTrafficCaptureSource(@NonNull RootReplayerContext rootReplayerContext, Consumer<String, byte[]> consumer, String str, Duration duration) {
        this(rootReplayerContext, consumer, str, duration, Clock.systemUTC(), new KafkaBehavioralPolicy());
        if (rootReplayerContext == null) {
            throw new NullPointerException("globalContext is marked non-null but is null");
        }
    }

    public KafkaTrafficCaptureSource(@NonNull RootReplayerContext rootReplayerContext, Consumer<String, byte[]> consumer, @NonNull String str, Duration duration, Clock clock, @NonNull KafkaBehavioralPolicy kafkaBehavioralPolicy) {
        if (rootReplayerContext == null) {
            throw new NullPointerException("globalContext is marked non-null but is null");
        }
        if (str == null) {
            throw new NullPointerException("topic is marked non-null but is null");
        }
        if (kafkaBehavioralPolicy == null) {
            throw new NullPointerException("behavioralPolicy is marked non-null but is null");
        }
        this.channelContextManager = new ChannelContextManager(rootReplayerContext);
        this.trackingKafkaConsumer = new TrackingKafkaConsumer(rootReplayerContext, consumer, str, duration, clock, this::onKeyFinishedCommitting);
        this.trafficStreamsRead = new AtomicLong();
        this.behavioralPolicy = kafkaBehavioralPolicy;
        consumer.subscribe(Collections.singleton(str), this.trackingKafkaConsumer);
        this.kafkaExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("kafkaConsumerThread"));
        this.isClosed = new AtomicBoolean(false);
    }

    private void onKeyFinishedCommitting(ITrafficStreamKey iTrafficStreamKey) {
        IScopedInstrumentationAttributes enclosingScope = iTrafficStreamKey.getTrafficStreamsContext().getEnclosingScope();
        if (!(enclosingScope instanceof ReplayContexts.KafkaRecordContext)) {
            throw new IllegalArgumentException("Expected parent context of type " + ReplayContexts.KafkaRecordContext.class + " instead of " + enclosingScope + " (of type=" + enclosingScope.getClass() + ")");
        }
        ReplayContexts.KafkaRecordContext kafkaRecordContext = (ReplayContexts.KafkaRecordContext) enclosingScope;
        kafkaRecordContext.close();
        this.channelContextManager.releaseContextFor((IReplayContexts.IChannelKeyContext) kafkaRecordContext.getImmediateEnclosingScope());
    }

    public static KafkaTrafficCaptureSource buildKafkaSource(@NonNull RootReplayerContext rootReplayerContext, @NonNull String str, @NonNull String str2, @NonNull String str3, boolean z, String str4, @NonNull Clock clock, @NonNull KafkaBehavioralPolicy kafkaBehavioralPolicy) throws IOException {
        if (rootReplayerContext == null) {
            throw new NullPointerException("globalContext is marked non-null but is null");
        }
        if (str == null) {
            throw new NullPointerException("brokers is marked non-null but is null");
        }
        if (str2 == null) {
            throw new NullPointerException("topic is marked non-null but is null");
        }
        if (str3 == null) {
            throw new NullPointerException("groupId is marked non-null but is null");
        }
        if (clock == null) {
            throw new NullPointerException("clock is marked non-null but is null");
        }
        if (kafkaBehavioralPolicy == null) {
            throw new NullPointerException("behavioralPolicy is marked non-null but is null");
        }
        Properties buildKafkaProperties = buildKafkaProperties(str, str3, z, str4);
        buildKafkaProperties.putIfAbsent(MAX_POLL_INTERVAL_KEY, DEFAULT_POLL_INTERVAL_MS);
        return new KafkaTrafficCaptureSource(rootReplayerContext, new KafkaConsumer(buildKafkaProperties), str2, getKeepAlivePeriodFromPollPeriod(Duration.ofMillis(Long.valueOf((String) buildKafkaProperties.get(MAX_POLL_INTERVAL_KEY)).longValue())), clock, kafkaBehavioralPolicy);
    }

    private static Duration getKeepAlivePeriodFromPollPeriod(Duration duration) {
        return duration.dividedBy(2L);
    }

    public static Properties buildKafkaProperties(@NonNull String str, @NonNull String str2, boolean z, String str3) throws IOException {
        if (str == null) {
            throw new NullPointerException("brokers is marked non-null but is null");
        }
        if (str2 == null) {
            throw new NullPointerException("groupId is marked non-null but is null");
        }
        Properties properties = new Properties();
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.setProperty("enable.auto.commit", "false");
        properties.setProperty("auto.offset.reset", "earliest");
        if (str3 != null) {
            try {
                FileInputStream fileInputStream = new FileInputStream(str3);
                try {
                    properties.load(fileInputStream);
                    fileInputStream.close();
                } finally {
                }
            } catch (IOException e) {
                log.error("Unable to load properties from kafka properties file with path: {}", str3);
                throw e;
            }
        }
        if (z) {
            properties.setProperty("security.protocol", "SASL_SSL");
            properties.setProperty("sasl.mechanism", "AWS_MSK_IAM");
            properties.setProperty("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;");
            properties.setProperty("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler");
        }
        properties.setProperty("bootstrap.servers", str);
        properties.setProperty("group.id", str2);
        return properties;
    }

    @Override // org.opensearch.migrations.replay.traffic.source.ITrafficCaptureSource
    public void touch(ITrafficSourceContexts.IBackPressureBlockContext iBackPressureBlockContext) {
        CompletableFuture.runAsync(() -> {
            this.trackingKafkaConsumer.touch(iBackPressureBlockContext);
        }, this.kafkaExecutor).get();
    }

    @Override // org.opensearch.migrations.replay.traffic.source.ITrafficCaptureSource
    public Optional<Instant> getNextRequiredTouch() {
        return this.trackingKafkaConsumer.getNextRequiredTouch();
    }

    @Override // org.opensearch.migrations.replay.traffic.source.ITrafficCaptureSource
    public CompletableFuture<List<ITrafficStreamWithKey>> readNextTrafficStreamChunk(Supplier<ITrafficSourceContexts.IReadChunkContext> supplier) {
        log.atTrace().setMessage("readNextTrafficStreamChunk()").log();
        return CompletableFuture.supplyAsync(() -> {
            log.atTrace().setMessage("async...readNextTrafficStreamChunk()").log();
            return readNextTrafficStreamSynchronously((ITrafficSourceContexts.IReadChunkContext) supplier.get());
        }, this.kafkaExecutor);
    }

    public List<ITrafficStreamWithKey> readNextTrafficStreamSynchronously(ITrafficSourceContexts.IReadChunkContext iReadChunkContext) {
        log.atTrace().setMessage("readNextTrafficStreamSynchronously()").log();
        try {
            return (List) this.trackingKafkaConsumer.getNextBatchOfRecords(iReadChunkContext, (kafkaCommitOffsetData, consumerRecord) -> {
                try {
                    TrafficStream parseFrom = TrafficStream.parseFrom((byte[]) consumerRecord.value());
                    log.atTrace().setMessage("Parsed traffic stream #{}: {} {}").addArgument(Long.valueOf(this.trafficStreamsRead.incrementAndGet())).addArgument(kafkaCommitOffsetData).addArgument(parseFrom).log();
                    return new PojoTrafficStreamAndKey(parseFrom, new TrafficStreamKeyWithKafkaRecordId(iTrafficStreamKey -> {
                        return this.channelContextManager.getGlobalContext().createTrafficStreamContextForKafkaSource(this.channelContextManager.retainOrCreateContext(iTrafficStreamKey), (String) consumerRecord.key(), consumerRecord.serializedKeySize() + consumerRecord.serializedValueSize());
                    }, parseFrom, kafkaCommitOffsetData));
                } catch (InvalidProtocolBufferException e) {
                    RuntimeException onInvalidKafkaRecord = this.behavioralPolicy.onInvalidKafkaRecord(consumerRecord, e);
                    if (onInvalidKafkaRecord != null) {
                        throw onInvalidKafkaRecord;
                    }
                    return null;
                }
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList());
        } catch (Exception e) {
            log.atError().setCause(e).setMessage("Terminating Kafka traffic stream due to exception").log();
            throw e;
        }
    }

    @Override // org.opensearch.migrations.replay.traffic.source.ITrafficCaptureSource
    public ITrafficCaptureSource.CommitResult commitTrafficStream(ITrafficStreamKey iTrafficStreamKey) {
        if (iTrafficStreamKey instanceof TrafficStreamKeyWithKafkaRecordId) {
            return this.trackingKafkaConsumer.commitKafkaKey(iTrafficStreamKey, (TrafficStreamKeyWithKafkaRecordId) iTrafficStreamKey);
        }
        throw new IllegalArgumentException("Expected key of type " + TrafficStreamKeyWithKafkaRecordId.class + " but received " + iTrafficStreamKey + " (of type=" + iTrafficStreamKey.getClass() + ")");
    }

    @Override // org.opensearch.migrations.replay.traffic.source.ITrafficCaptureSource, java.lang.AutoCloseable
    public void close() throws IOException, InterruptedException, ExecutionException {
        if (this.isClosed.compareAndSet(false, true)) {
            ExecutorService executorService = this.kafkaExecutor;
            TrackingKafkaConsumer trackingKafkaConsumer = this.trackingKafkaConsumer;
            Objects.requireNonNull(trackingKafkaConsumer);
            executorService.submit(trackingKafkaConsumer::close).get();
            this.kafkaExecutor.shutdownNow();
        }
    }
}
