package one.microstream.afs.kafka.types;

import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import one.microstream.X;
import one.microstream.afs.kafka.types.Blob;
import one.microstream.chars.XChars;
import one.microstream.collections.BulkList;
import one.microstream.collections.ConstList;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

/* loaded from: input_file:one/microstream/afs/kafka/types/TopicIndex.class */
public interface TopicIndex extends AutoCloseable {

    /* loaded from: input_file:one/microstream/afs/kafka/types/TopicIndex$Default.class */
    public static class Default implements TopicIndex {
        private final Properties kafkaProperties;
        private final String topic;
        private BulkList<Blob> blobs;
        private Producer<String, byte[]> producer;

        /* JADX INFO: Access modifiers changed from: package-private */
        public static String indexTopicName(String str) {
            return "__" + str + "_index";
        }

        static int getInt(byte[] bArr, int i) {
            return (bArr[i + 3] & 255) + ((bArr[i + 2] & 255) << 8) + ((bArr[i + 1] & 255) << 16) + (bArr[i] << 24);
        }

        static long getLong(byte[] bArr, int i) {
            return (bArr[i + 7] & 255) + ((bArr[i + 6] & 255) << 8) + ((bArr[i + 5] & 255) << 16) + ((bArr[i + 4] & 255) << 24) + ((bArr[i + 3] & 255) << 32) + ((bArr[i + 2] & 255) << 40) + ((bArr[i + 1] & 255) << 48) + (bArr[i] << 56);
        }

        static void putInt(byte[] bArr, int i, int i2) {
            bArr[i + 3] = (byte) i2;
            bArr[i + 2] = (byte) (i2 >>> 8);
            bArr[i + 1] = (byte) (i2 >>> 16);
            bArr[i] = (byte) (i2 >>> 24);
        }

        static void putLong(byte[] bArr, int i, long j) {
            bArr[i + 7] = (byte) j;
            bArr[i + 6] = (byte) (j >>> 8);
            bArr[i + 5] = (byte) (j >>> 16);
            bArr[i + 4] = (byte) (j >>> 24);
            bArr[i + 3] = (byte) (j >>> 32);
            bArr[i + 2] = (byte) (j >>> 40);
            bArr[i + 1] = (byte) (j >>> 48);
            bArr[i] = (byte) (j >>> 56);
        }

        Default(Properties properties, String str) {
            this.kafkaProperties = properties;
            this.topic = str;
        }

        private String topicName() {
            return indexTopicName(this.topic);
        }

        private BulkList<Blob> ensureBlobs() {
            if (this.blobs == null) {
                synchronized (this) {
                    if (this.blobs == null) {
                        this.blobs = createBlobs();
                    }
                }
            }
            return this.blobs;
        }

        private BulkList<Blob> createBlobs() {
            BulkList<Blob> New = BulkList.New();
            Properties properties = new Properties();
            properties.setProperty("key.deserializer", StringDeserializer.class.getName());
            properties.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
            properties.setProperty("group.id", "topicindex" + UUID.randomUUID().toString());
            properties.remove("auto.offset.reset");
            properties.remove("enable.auto.commit");
            properties.putAll(this.kafkaProperties);
            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
            try {
                TopicPartition topicPartition = new TopicPartition(topicName(), 0);
                kafkaConsumer.assign(Arrays.asList(topicPartition));
                kafkaConsumer.seekToEnd(Arrays.asList(topicPartition));
                long position = kafkaConsumer.position(topicPartition) - 1;
                kafkaConsumer.seekToBeginning(Arrays.asList(topicPartition));
                long j = -1;
                while (j < position) {
                    Iterator it = kafkaConsumer.poll(Duration.ofSeconds(1L)).iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        byte[] bArr = (byte[]) consumerRecord.value();
                        New.add(new Blob.Default(this.topic, getInt(bArr, 0), getLong(bArr, 4), getLong(bArr, 12), getLong(bArr, 20)));
                        j = consumerRecord.offset();
                    }
                }
                kafkaConsumer.close();
                return New;
            } catch (Throwable th) {
                try {
                    kafkaConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }

        private Producer<String, byte[]> ensureProducer() {
            if (this.producer == null) {
                Properties properties = new Properties();
                properties.putAll(this.kafkaProperties);
                properties.setProperty("key.serializer", StringSerializer.class.getName());
                properties.setProperty("value.serializer", ByteArraySerializer.class.getName());
                this.producer = new KafkaProducer(properties);
            }
            return this.producer;
        }

        private void internalProduce(Producer<String, byte[]> producer, Blob blob) {
            byte[] bArr = new byte[28];
            putInt(bArr, 0, blob.partition());
            putLong(bArr, 4, blob.offset());
            putLong(bArr, 12, blob.start());
            putLong(bArr, 20, blob.end());
            try {
                producer.send(new ProducerRecord(topicName(), bArr)).get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        @Override // one.microstream.afs.kafka.types.TopicIndex
        public Iterable<Blob> get() {
            ConstList immure;
            BulkList<Blob> ensureBlobs = ensureBlobs();
            synchronized (ensureBlobs) {
                immure = ensureBlobs.immure();
            }
            return immure;
        }

        @Override // one.microstream.afs.kafka.types.TopicIndex
        public TopicIndex put(Iterable<Blob> iterable) {
            BulkList<Blob> ensureBlobs = ensureBlobs();
            synchronized (ensureBlobs) {
                Producer<String, byte[]> ensureProducer = ensureProducer();
                iterable.forEach(blob -> {
                    ensureBlobs.add(blob);
                    internalProduce(ensureProducer, blob);
                });
                ensureProducer.flush();
            }
            return this;
        }

        @Override // one.microstream.afs.kafka.types.TopicIndex
        public TopicIndex delete(Map<TopicPartition, RecordsToDelete> map) {
            BulkList<Blob> ensureBlobs = ensureBlobs();
            synchronized (ensureBlobs) {
                Map map2 = (Map) map.entrySet().stream().collect(Collectors.toMap(entry -> {
                    return Integer.valueOf(((TopicPartition) entry.getKey()).partition());
                }, entry2 -> {
                    return (RecordsToDelete) entry2.getValue();
                }));
                if (ensureBlobs.removeBy(blob -> {
                    RecordsToDelete recordsToDelete = (RecordsToDelete) map2.get(Integer.valueOf(blob.partition()));
                    return recordsToDelete != null && recordsToDelete.beforeOffset() > blob.offset();
                }) > 0) {
                    try {
                        AdminClient create = AdminClient.create(this.kafkaProperties);
                        try {
                            create.deleteRecords(map).all().get();
                            if (create != null) {
                                create.close();
                            }
                            Producer<String, byte[]> ensureProducer = ensureProducer();
                            ensureBlobs.forEach(blob2 -> {
                                internalProduce(ensureProducer, blob2);
                            });
                            ensureProducer.flush();
                        } catch (Throwable th) {
                            if (create != null) {
                                try {
                                    create.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            }
            return this;
        }

        @Override // one.microstream.afs.kafka.types.TopicIndex, java.lang.AutoCloseable
        public synchronized void close() {
            if (this.blobs != null) {
                this.blobs.clear();
                this.blobs = null;
            }
            if (this.producer != null) {
                this.producer.close();
                this.producer = null;
            }
        }
    }

    Iterable<Blob> get();

    TopicIndex put(Iterable<Blob> iterable);

    TopicIndex delete(Map<TopicPartition, RecordsToDelete> map);

    @Override // java.lang.AutoCloseable
    void close();

    static TopicIndex New(Properties properties, String str) {
        return new Default((Properties) X.notNull(properties), (String) XChars.notEmpty(str));
    }
}
