package org.creekservice.internal.kafka.streams.test.extension.handler;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.creekservice.api.kafka.extension.resource.KafkaTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/creekservice/internal/kafka/streams/test/extension/handler/TopicConsumer.class */
final class TopicConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(TopicExpectationHandler.class);
    private final KafkaTopic<?, ?> topic;
    private final Consumer<byte[], byte[]> consumer;
    private final Clock clock;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/creekservice/internal/kafka/streams/test/extension/handler/TopicConsumer$DeserializationException.class */
    public static final class DeserializationException extends RuntimeException {
        DeserializationException(String str, ConsumerRecord<?, ?> consumerRecord, Throwable th) {
            super("Failed to deserialize record " + str + ". topic: " + consumerRecord.topic() + ", partition: " + consumerRecord.partition() + ", offset: " + consumerRecord.offset(), th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/creekservice/internal/kafka/streams/test/extension/handler/TopicConsumer$KafkaClientException.class */
    public static final class KafkaClientException extends RuntimeException {
        KafkaClientException(String str, Throwable th) {
            super(str, th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TopicConsumer(KafkaTopic<?, ?> kafkaTopic, Consumer<byte[], byte[]> consumer) {
        this(kafkaTopic, consumer, Clock.systemUTC());
    }

    TopicConsumer(KafkaTopic<?, ?> kafkaTopic, Consumer<byte[], byte[]> consumer, Clock clock) {
        this.topic = (KafkaTopic) Objects.requireNonNull(kafkaTopic, "topic");
        this.consumer = (Consumer) Objects.requireNonNull(consumer, "consumer");
        this.clock = (Clock) Objects.requireNonNull(clock, "clock");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assignAndSeek(Map<TopicPartition, Long> map) {
        try {
            this.consumer.assign(map.keySet());
            try {
                Consumer<byte[], byte[]> consumer = this.consumer;
                Objects.requireNonNull(consumer);
                map.forEach((v1, v2) -> {
                    r1.seek(v1, v2);
                });
                LOGGER.info("Consuming expected output from " + this.topic.name() + ", with starting offsets: " + String.valueOf(map));
            } catch (Exception e) {
                throw new KafkaClientException("Failed to seek topic partition to starting offset", e);
            }
        } catch (Exception e2) {
            throw new KafkaClientException("Failed to assign topic partitions", e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<ConsumedRecord> consume(long j, Instant instant) {
        ArrayList arrayList = new ArrayList();
        while (arrayList.size() < j && this.clock.instant().isBefore(instant)) {
            poll().forEach(consumerRecord -> {
                ConsumedRecord consumedRecord = new ConsumedRecord(consumerRecord, deserializeKey(consumerRecord), deserializeValue(consumerRecord));
                LOGGER.debug("Consumed: " + String.valueOf(consumedRecord));
                arrayList.add(consumedRecord);
            });
        }
        return arrayList;
    }

    private ConsumerRecords<byte[], byte[]> poll() {
        try {
            return this.consumer.poll(Duration.ofSeconds(1L));
        } catch (Exception e) {
            throw new KafkaClientException("Failed to consume records from Kafka", e);
        }
    }

    private Optional<?> deserializeKey(ConsumerRecord<byte[], byte[]> consumerRecord) {
        try {
            return Optional.ofNullable(this.topic.deserializeKey((byte[]) consumerRecord.key()));
        } catch (Exception e) {
            throw new DeserializationException("key", consumerRecord, e);
        }
    }

    private Optional<?> deserializeValue(ConsumerRecord<byte[], byte[]> consumerRecord) {
        try {
            return Optional.ofNullable(this.topic.deserializeValue((byte[]) consumerRecord.value()));
        } catch (Exception e) {
            throw new DeserializationException("value", consumerRecord, e);
        }
    }
}
