package in.rcard.kafkaesque.producer;

import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serializer;

/* loaded from: input_file:in/rcard/kafkaesque/producer/KafkaesqueProducer.class */
public final class KafkaesqueProducer<Key, Value> {
    private final KafkaProducer<Key, Value> kafkaProducer;
    private final DelegateCreationInfo<Key, Value> creationInfo;
    private final Duration forEachAckDuration;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:in/rcard/kafkaesque/producer/KafkaesqueProducer$DelegateCreationInfo.class */
    public static class DelegateCreationInfo<Key, Value> {
        private final String topic;
        private final Serializer<Key> keySerializer;
        private final Serializer<Value> valueSerializer;

        public DelegateCreationInfo(String str, Serializer<Key> serializer, Serializer<Value> serializer2) {
            this.topic = str;
            this.keySerializer = serializer;
            this.valueSerializer = serializer2;
        }

        public String getTopic() {
            return this.topic;
        }

        public Serializer<Key> getKeySerializer() {
            return this.keySerializer;
        }

        public Serializer<Value> getValueSerializer() {
            return this.valueSerializer;
        }
    }

    /* loaded from: input_file:in/rcard/kafkaesque/producer/KafkaesqueProducer$Record.class */
    public static class Record<Key, Value> {
        private final Key key;
        private final Value value;

        private Record(Key key, Value value) {
            this.key = key;
            this.value = value;
        }

        public static <Key, Value> Record<Key, Value> of(Key key, Value value) {
            return new Record<>(key, value);
        }

        public static <Key, Value> Record<Key, Value> of(ProducerRecord<Key, Value> producerRecord) {
            return new Record<>(producerRecord.key(), producerRecord.value());
        }

        public ProducerRecord<Key, Value> toPr(String str) {
            return new ProducerRecord<>(str, this.key, this.value);
        }

        public Key getKey() {
            return this.key;
        }

        public Value getValue() {
            return this.value;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Record record = (Record) obj;
            return Objects.equals(this.key, record.key) && Objects.equals(this.value, record.value);
        }

        public int hashCode() {
            return Objects.hash(this.key, this.value);
        }

        public String toString() {
            return "Record{key=" + this.key + ", value=" + this.value + "}";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaesqueProducer(String str, Duration duration, DelegateCreationInfo<Key, Value> delegateCreationInfo) {
        this.forEachAckDuration = duration;
        this.creationInfo = delegateCreationInfo;
        this.kafkaProducer = createKafkaProducer(str);
    }

    private KafkaProducer<Key, Value> createKafkaProducer(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("acks", "all");
        properties.put("key.serializer", this.creationInfo.getKeySerializer().getClass());
        properties.put("value.serializer", this.creationInfo.getValueSerializer().getClass());
        return new KafkaProducer<>(properties);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ProducerRecord<Key, Value> sendRecord(Record<Key, Value> record) {
        try {
            ProducerRecord<Key, Value> pr = record.toPr(this.creationInfo.getTopic());
            sendSingleRecord(pr).get(this.forEachAckDuration.toMillis(), TimeUnit.MILLISECONDS);
            return pr;
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new AssertionError(String.format("Impossible to send a the record %s in %d milliseconds", record, Long.valueOf(this.forEachAckDuration.toMillis())), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<ProducerRecord<Key, Value>> sendRecords(List<Record<Key, Value>> list) {
        List<ProducerRecord<Key, Value>> list2 = (List) list.stream().map(record -> {
            return record.toPr(this.creationInfo.getTopic());
        }).collect(Collectors.toList());
        try {
            CompletableFuture.allOf((CompletableFuture[]) ((List) list2.stream().map(this::sendSingleRecord).collect(Collectors.toList())).toArray(i -> {
                return new CompletableFuture[i];
            })).get(this.forEachAckDuration.toMillis(), TimeUnit.MILLISECONDS);
            return list2;
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new AssertionError(String.format("At least the sending of one record of the list %s takes more than %d milliseconds", list, Long.valueOf(this.forEachAckDuration.toMillis())), e);
        }
    }

    private CompletableFuture<RecordMetadata> sendSingleRecord(ProducerRecord<Key, Value> producerRecord) {
        CompletableFuture<RecordMetadata> completableFuture = new CompletableFuture<>();
        this.kafkaProducer.send(producerRecord, (recordMetadata, exc) -> {
            if (exc == null) {
                completableFuture.complete(recordMetadata);
            } else {
                completableFuture.completeExceptionally(exc);
            }
        });
        return completableFuture;
    }
}
