package one.microstream.afs.kafka.types;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import one.microstream.X;
import one.microstream.afs.blobstore.types.BlobStoreConnector;
import one.microstream.afs.blobstore.types.BlobStorePath;
import one.microstream.afs.kafka.types.TopicIndex;
import one.microstream.collections.BulkList;
import one.microstream.collections.EqHashTable;
import one.microstream.exceptions.IORuntimeException;
import one.microstream.io.ByteBufferInputStream;
import one.microstream.io.LimitedInputStream;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

/* loaded from: input_file:one/microstream/afs/kafka/types/KafkaConnector.class */
public interface KafkaConnector extends BlobStoreConnector {

    /* loaded from: input_file:one/microstream/afs/kafka/types/KafkaConnector$Default.class */
    public static class Default extends BlobStoreConnector.Abstract<Blob> implements KafkaConnector {
        private final Properties kafkaProperties;
        private final FileSystemIndex fileSystemIndex;
        private final EqHashTable<String, TopicIndex> topicIndices;
        private final EqHashTable<String, KafkaConsumer<String, byte[]>> kafkaConsumers;
        private final EqHashTable<String, KafkaProducer<String, byte[]>> kafkaProducers;

        static String topicName(BlobStorePath blobStorePath) {
            return Pattern.compile("[^a-zA-Z0-9\\._\\-]").matcher(blobStorePath.fullQualifiedName().replace('/', '_')).replaceAll("_");
        }

        Default(Properties properties, boolean z) {
            super((v0) -> {
                return v0.topic();
            }, (v0) -> {
                return v0.size();
            }, KafkaPathValidator.New(), z);
            this.kafkaProperties = properties;
            this.fileSystemIndex = FileSystemIndex.New(properties);
            this.topicIndices = EqHashTable.New();
            this.kafkaConsumers = EqHashTable.New();
            this.kafkaProducers = EqHashTable.New();
        }

        private synchronized TopicIndex topicIndex(BlobStorePath blobStorePath) {
            return (TopicIndex) this.topicIndices.ensure(topicName(blobStorePath), str -> {
                return TopicIndex.New(this.kafkaProperties, str);
            });
        }

        private synchronized KafkaConsumer<String, byte[]> consumer(String str) {
            return (KafkaConsumer) this.kafkaConsumers.ensure(str, this::createConsumer);
        }

        private KafkaConsumer<String, byte[]> createConsumer(String str) {
            Properties properties = new Properties();
            properties.setProperty("key.deserializer", StringDeserializer.class.getName());
            properties.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
            properties.setProperty("max.poll.records", "1");
            properties.remove("group.id");
            properties.remove("auto.offset.reset");
            properties.remove("enable.auto.commit");
            properties.putAll(this.kafkaProperties);
            return new KafkaConsumer<>(properties);
        }

        private synchronized KafkaProducer<String, byte[]> producer(String str) {
            return (KafkaProducer) this.kafkaProducers.ensure(str, this::createProducer);
        }

        private KafkaProducer<String, byte[]> createProducer(String str) {
            Properties properties = new Properties();
            properties.setProperty("key.serializer", StringSerializer.class.getName());
            properties.setProperty("value.serializer", ByteArraySerializer.class.getName());
            properties.putAll(this.kafkaProperties);
            return new KafkaProducer<>(properties);
        }

        protected Stream<Blob> blobs(BlobStorePath blobStorePath) {
            Iterable<Blob> iterable = topicIndex(blobStorePath).get();
            return iterable != null ? StreamSupport.stream(iterable.spliterator(), false) : Stream.empty();
        }

        protected Stream<String> childKeys(BlobStorePath blobStorePath) {
            Pattern compile = Pattern.compile(childKeysRegexWithContainer(blobStorePath));
            return this.fileSystemIndex.files().filter(str -> {
                return compile.matcher(str).matches();
            });
        }

        protected String fileNameOfKey(String str) {
            return str.substring(str.lastIndexOf(47) + 1);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void internalReadBlobData(BlobStorePath blobStorePath, Blob blob, ByteBuffer byteBuffer, long j, long j2) {
            KafkaConsumer<String, byte[]> consumer = consumer(blob.topic());
            TopicPartition topicPartition = new TopicPartition(topicName(blobStorePath), blob.partition());
            consumer.assign(Arrays.asList(topicPartition));
            consumer.seek(topicPartition, blob.offset());
            int i = 0;
            do {
                ConsumerRecords poll = consumer.poll(Duration.ofSeconds(3L));
                if (!poll.isEmpty()) {
                    byteBuffer.put((byte[]) ((ConsumerRecord) poll.iterator().next()).value(), X.checkArrayRange(j), X.checkArrayRange(j2));
                    return;
                }
                i++;
            } while (i < 3);
            RuntimeException runtimeException = new RuntimeException("No data available for " + blobStorePath.fullQualifiedName() + ", offset=" + j + ", length=" + runtimeException);
            throw runtimeException;
        }

        protected boolean internalDeleteFile(BlobStorePath blobStorePath) {
            try {
                String str = topicName(blobStorePath);
                AdminClient create = AdminClient.create(this.kafkaProperties);
                try {
                    create.deleteTopics(Arrays.asList(str, TopicIndex.Default.indexTopicName(str))).all().get();
                    if (create != null) {
                        create.close();
                    }
                    synchronized (this) {
                        this.fileSystemIndex.delete(blobStorePath.fullQualifiedName());
                        Optional.ofNullable((TopicIndex) this.topicIndices.removeFor(str)).ifPresent((v0) -> {
                            v0.close();
                        });
                        Optional.ofNullable((KafkaConsumer) this.kafkaConsumers.removeFor(str)).ifPresent((v0) -> {
                            v0.close();
                        });
                        Optional.ofNullable((KafkaProducer) this.kafkaProducers.removeFor(str)).ifPresent((v0) -> {
                            v0.close();
                        });
                    }
                    return true;
                } finally {
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        protected boolean internalDeleteBlobs(BlobStorePath blobStorePath, List<? extends Blob> list) {
            String str = topicName(blobStorePath);
            Map<TopicPartition, RecordsToDelete> map = (Map) ((Map) list.stream().collect(Collectors.groupingBy((v0) -> {
                return v0.partition();
            }, Collectors.summarizingLong((v0) -> {
                return v0.offset();
            })))).entrySet().stream().collect(Collectors.toMap(entry -> {
                return new TopicPartition(str, ((Integer) entry.getKey()).intValue());
            }, entry2 -> {
                return RecordsToDelete.beforeOffset(((LongSummaryStatistics) entry2.getValue()).getMax() + 1);
            }));
            ArrayList arrayList = new ArrayList();
            List<Blob> list2 = (List) blobs(blobStorePath).collect(Collectors.toList());
            list2.removeAll(list);
            for (Blob blob : list2) {
                ByteBuffer allocateDirect = ByteBuffer.allocateDirect(X.checkArrayRange(blob.size()));
                internalReadBlobData(blobStorePath, blob, allocateDirect, 0L, blob.size());
                allocateDirect.flip();
                arrayList.add(allocateDirect);
            }
            try {
                AdminClient create = AdminClient.create(this.kafkaProperties);
                try {
                    create.deleteRecords(map).all().get();
                    if (create != null) {
                        create.close();
                    }
                    synchronized (this) {
                        TopicIndex topicIndex = (TopicIndex) this.topicIndices.get(str);
                        if (topicIndex != null) {
                            topicIndex.delete(map);
                        }
                    }
                    internalWriteData(blobStorePath, arrayList);
                    return true;
                } finally {
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        protected long internalWriteData(BlobStorePath blobStorePath, Iterable<? extends ByteBuffer> iterable) {
            int read;
            try {
                KafkaProducer<String, byte[]> producer = producer(blobStorePath.fullQualifiedName());
                String str = topicName(blobStorePath);
                long j = totalSize(iterable);
                BulkList New = BulkList.New();
                ByteBufferInputStream New2 = ByteBufferInputStream.New(iterable);
                long j2 = j;
                long fileSize = fileSize(blobStorePath);
                while (j2 > 0) {
                    long min = Math.min(j2, 1000000L);
                    LimitedInputStream New3 = LimitedInputStream.New(new BufferedInputStream(New2), min);
                    try {
                        byte[] bArr = new byte[X.checkArrayRange(min)];
                        int length = bArr.length;
                        while (length > 0 && (read = New3.read(bArr, 0, Math.min(bArr.length, length))) != -1) {
                            length -= read;
                        }
                        RecordMetadata recordMetadata = (RecordMetadata) producer.send(new ProducerRecord(str, bArr)).get();
                        New.add(Blob.New(str, recordMetadata.partition(), recordMetadata.offset(), fileSize, (fileSize + min) - 1));
                        if (New3 != null) {
                            New3.close();
                        }
                        j2 -= min;
                        fileSize += min;
                    } finally {
                    }
                }
                producer.flush();
                this.fileSystemIndex.put(blobStorePath.fullQualifiedName());
                topicIndex(blobStorePath).put(New);
                return j;
            } catch (IOException e) {
                throw new IORuntimeException(e);
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        }

        protected synchronized void internalClose() {
            this.fileSystemIndex.close();
            this.topicIndices.values().forEach((v0) -> {
                v0.close();
            });
            this.topicIndices.clear();
            this.kafkaConsumers.values().forEach((v0) -> {
                v0.close();
            });
            this.kafkaConsumers.clear();
            this.kafkaProducers.values().forEach((v0) -> {
                v0.close();
            });
            this.kafkaProducers.clear();
        }
    }

    static KafkaConnector New(Properties properties) {
        return new Default((Properties) X.notNull(properties), false);
    }

    static KafkaConnector Caching(Properties properties) {
        return new Default((Properties) X.notNull(properties), true);
    }
}
