package io.pravega.segmentstore.server.tables;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Runnables;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.TimeoutTimer;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.ArrayView;
import io.pravega.common.util.AsyncIterator;
import io.pravega.common.util.ByteArraySegment;
import io.pravega.common.util.IllegalDataFormatException;
import io.pravega.segmentstore.contracts.AttributeUpdate;
import io.pravega.segmentstore.contracts.AttributeUpdateType;
import io.pravega.segmentstore.contracts.StreamSegmentTruncatedException;
import io.pravega.segmentstore.contracts.tables.IteratorItem;
import io.pravega.segmentstore.contracts.tables.TableAttributes;
import io.pravega.segmentstore.contracts.tables.TableEntry;
import io.pravega.segmentstore.contracts.tables.TableKey;
import io.pravega.segmentstore.server.CacheManager;
import io.pravega.segmentstore.server.DirectSegmentAccess;
import io.pravega.segmentstore.server.SegmentContainer;
import io.pravega.segmentstore.server.SegmentMetadata;
import io.pravega.segmentstore.server.UpdateableSegmentMetadata;
import io.pravega.segmentstore.server.WriterSegmentProcessor;
import io.pravega.segmentstore.server.tables.TableBucketReader;
import io.pravega.segmentstore.server.tables.TableIterator;
import io.pravega.segmentstore.storage.CacheFactory;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/segmentstore/server/tables/ContainerTableExtensionImpl.class */
public class ContainerTableExtensionImpl implements ContainerTableExtension {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log;
    private static final int MAX_BATCH_SIZE = 33554432;
    private static final int DEFAULT_MAX_COMPACTION_SIZE = 4194304;
    private final SegmentContainer segmentContainer;
    private final ScheduledExecutorService executor;
    private final KeyHasher hasher;
    private final ContainerKeyIndex keyIndex;
    private final EntrySerializer serializer;
    private final AtomicBoolean closed;
    private final String traceObjectId;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:io/pravega/segmentstore/server/tables/ContainerTableExtensionImpl$GetBucketReader.class */
    public interface GetBucketReader<T> {
        TableBucketReader<T> apply(DirectSegmentAccess directSegmentAccess, TableBucketReader.GetBackpointer getBackpointer, ScheduledExecutorService scheduledExecutorService);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/segmentstore/server/tables/ContainerTableExtensionImpl$GetResultBuilder.class */
    public static class GetResultBuilder {
        private final List<ArrayView> keys;
        private final List<UUID> hashes;
        private final List<CompletableFuture<TableEntry>> resultFutures;

        GetResultBuilder(List<ArrayView> list, KeyHasher keyHasher) {
            this.keys = list;
            Stream<ArrayView> stream = list.stream();
            keyHasher.getClass();
            this.hashes = (List) stream.map(keyHasher::hash).collect(Collectors.toList());
            this.resultFutures = new ArrayList();
        }

        void includeResult(CompletableFuture<TableEntry> completableFuture) {
            this.resultFutures.add(completableFuture);
        }

        CompletableFuture<List<TableEntry>> getResultFutures() {
            return Futures.allOfWithResults(this.resultFutures);
        }

        @SuppressFBWarnings(justification = "generated code")
        public List<ArrayView> getKeys() {
            return this.keys;
        }

        @SuppressFBWarnings(justification = "generated code")
        public List<UUID> getHashes() {
            return this.hashes;
        }
    }

    /* loaded from: input_file:io/pravega/segmentstore/server/tables/ContainerTableExtensionImpl$IteratorItemImpl.class */
    private class IteratorItemImpl<T> implements IteratorItem<T> {
        private final IteratorState state;
        private final Collection<T> entries;

        public ArrayView getState() {
            return this.state.serialize();
        }

        public String toString() {
            return String.format("State = %s, EntryCount = %s", this.state, Integer.valueOf(this.entries.size()));
        }

        @SuppressFBWarnings(justification = "generated code")
        @ConstructorProperties({"state", "entries"})
        public IteratorItemImpl(IteratorState iteratorState, Collection<T> collection) {
            this.state = iteratorState;
            this.entries = collection;
        }

        @SuppressFBWarnings(justification = "generated code")
        public Collection<T> getEntries() {
            return this.entries;
        }
    }

    /* loaded from: input_file:io/pravega/segmentstore/server/tables/ContainerTableExtensionImpl$TableWriterConnectorImpl.class */
    private class TableWriterConnectorImpl implements TableWriterConnector {
        private final SegmentMetadata metadata;

        @Override // io.pravega.segmentstore.server.tables.TableWriterConnector
        public EntrySerializer getSerializer() {
            return ContainerTableExtensionImpl.this.serializer;
        }

        @Override // io.pravega.segmentstore.server.tables.TableWriterConnector
        public KeyHasher getKeyHasher() {
            return ContainerTableExtensionImpl.this.hasher;
        }

        @Override // io.pravega.segmentstore.server.tables.TableWriterConnector
        public CompletableFuture<DirectSegmentAccess> getSegment(Duration duration) {
            return ContainerTableExtensionImpl.this.segmentContainer.forSegment(this.metadata.getName(), duration);
        }

        @Override // io.pravega.segmentstore.server.tables.TableWriterConnector
        public void notifyIndexOffsetChanged(long j) {
            ContainerTableExtensionImpl.this.keyIndex.notifyIndexOffsetChanged(this.metadata.getId(), j);
        }

        @Override // io.pravega.segmentstore.server.tables.TableWriterConnector
        public int getMaxCompactionSize() {
            return ContainerTableExtensionImpl.this.getMaxCompactionSize();
        }

        @Override // io.pravega.segmentstore.server.tables.TableWriterConnector, java.lang.AutoCloseable
        public void close() {
            ContainerTableExtensionImpl.this.keyIndex.notifyIndexOffsetChanged(this.metadata.getId(), -1L);
        }

        @SuppressFBWarnings(justification = "generated code")
        @ConstructorProperties({"metadata"})
        public TableWriterConnectorImpl(SegmentMetadata segmentMetadata) {
            this.metadata = segmentMetadata;
        }

        @Override // io.pravega.segmentstore.server.tables.TableWriterConnector
        @SuppressFBWarnings(justification = "generated code")
        public SegmentMetadata getMetadata() {
            return this.metadata;
        }
    }

    public ContainerTableExtensionImpl(SegmentContainer segmentContainer, CacheFactory cacheFactory, CacheManager cacheManager, ScheduledExecutorService scheduledExecutorService) {
        this(segmentContainer, cacheFactory, cacheManager, KeyHasher.sha256(), scheduledExecutorService);
    }

    @VisibleForTesting
    ContainerTableExtensionImpl(@NonNull SegmentContainer segmentContainer, @NonNull CacheFactory cacheFactory, @NonNull CacheManager cacheManager, @NonNull KeyHasher keyHasher, @NonNull ScheduledExecutorService scheduledExecutorService) {
        if (segmentContainer == null) {
            throw new NullPointerException("segmentContainer is marked @NonNull but is null");
        }
        if (cacheFactory == null) {
            throw new NullPointerException("cacheFactory is marked @NonNull but is null");
        }
        if (cacheManager == null) {
            throw new NullPointerException("cacheManager is marked @NonNull but is null");
        }
        if (keyHasher == null) {
            throw new NullPointerException("hasher is marked @NonNull but is null");
        }
        if (scheduledExecutorService == null) {
            throw new NullPointerException("executor is marked @NonNull but is null");
        }
        this.segmentContainer = segmentContainer;
        this.executor = scheduledExecutorService;
        this.hasher = keyHasher;
        this.keyIndex = new ContainerKeyIndex(segmentContainer.getId(), cacheFactory, cacheManager, this.hasher, this.executor);
        this.serializer = new EntrySerializer();
        this.closed = new AtomicBoolean();
        this.traceObjectId = String.format("TableExtension[%d]", Integer.valueOf(this.segmentContainer.getId()));
    }

    @Override // io.pravega.segmentstore.server.SegmentContainerExtension, java.lang.AutoCloseable
    public void close() {
        if (this.closed.getAndSet(true)) {
            return;
        }
        this.keyIndex.close();
        log.info("{}: Closed.", this.traceObjectId);
    }

    @Override // io.pravega.segmentstore.server.SegmentContainerExtension
    public Collection<WriterSegmentProcessor> createWriterSegmentProcessors(UpdateableSegmentMetadata updateableSegmentMetadata) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        return !updateableSegmentMetadata.getAttributes().containsKey(TableAttributes.INDEX_OFFSET) ? Collections.emptyList() : Collections.singletonList(new WriterTableProcessor(new TableWriterConnectorImpl(updateableSegmentMetadata), this.executor));
    }

    public CompletableFuture<Void> createSegment(@NonNull String str, Duration duration) {
        if (str == null) {
            throw new NullPointerException("segmentName is marked @NonNull but is null");
        }
        Exceptions.checkNotClosed(this.closed.get(), this);
        List list = (List) TableAttributes.DEFAULT_VALUES.entrySet().stream().map(entry -> {
            return new AttributeUpdate((UUID) entry.getKey(), AttributeUpdateType.None, ((Long) entry.getValue()).longValue());
        }).collect(Collectors.toList());
        logRequest("createSegment", str);
        return this.segmentContainer.createStreamSegment(str, list, duration);
    }

    public CompletableFuture<Void> deleteSegment(@NonNull String str, boolean z, Duration duration) {
        if (str == null) {
            throw new NullPointerException("segmentName is marked @NonNull but is null");
        }
        logRequest("deleteSegment", str, Boolean.valueOf(z));
        if (z) {
            TimeoutTimer timeoutTimer = new TimeoutTimer(duration);
            return this.segmentContainer.forSegment(str, timeoutTimer.getRemaining()).thenComposeAsync(directSegmentAccess -> {
                return this.keyIndex.executeIfEmpty(directSegmentAccess, () -> {
                    return this.segmentContainer.deleteStreamSegment(str, timeoutTimer.getRemaining());
                }, timeoutTimer);
            }, (Executor) this.executor);
        }
        Exceptions.checkNotClosed(this.closed.get(), this);
        return this.segmentContainer.deleteStreamSegment(str, duration);
    }

    public CompletableFuture<Void> merge(@NonNull String str, @NonNull String str2, Duration duration) {
        if (str == null) {
            throw new NullPointerException("targetSegmentName is marked @NonNull but is null");
        }
        if (str2 == null) {
            throw new NullPointerException("sourceSegmentName is marked @NonNull but is null");
        }
        Exceptions.checkNotClosed(this.closed.get(), this);
        throw new UnsupportedOperationException("merge");
    }

    public CompletableFuture<Void> seal(String str, Duration duration) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        throw new UnsupportedOperationException("seal");
    }

    public CompletableFuture<List<Long>> put(@NonNull String str, @NonNull List<TableEntry> list, Duration duration) {
        if (str == null) {
            throw new NullPointerException("segmentName is marked @NonNull but is null");
        }
        if (list == null) {
            throw new NullPointerException("entries is marked @NonNull but is null");
        }
        Exceptions.checkNotClosed(this.closed.get(), this);
        TimeoutTimer timeoutTimer = new TimeoutTimer(duration);
        Function function = (v0) -> {
            return v0.getKey();
        };
        EntrySerializer entrySerializer = this.serializer;
        entrySerializer.getClass();
        TableKeyBatch batch = batch(list, function, entrySerializer::getUpdateLength, TableKeyBatch.update());
        logRequest("put", str, Boolean.valueOf(batch.isConditional()), Boolean.valueOf(batch.isRemoval()), Integer.valueOf(list.size()), Integer.valueOf(batch.getLength()));
        return this.segmentContainer.forSegment(str, timeoutTimer.getRemaining()).thenComposeAsync(directSegmentAccess -> {
            return this.keyIndex.update(directSegmentAccess, batch, () -> {
                int length = batch.getLength();
                EntrySerializer entrySerializer2 = this.serializer;
                entrySerializer2.getClass();
                return commit(list, length, entrySerializer2::serializeUpdate, directSegmentAccess, timeoutTimer.getRemaining());
            }, timeoutTimer);
        }, (Executor) this.executor);
    }

    public CompletableFuture<Void> remove(@NonNull String str, @NonNull Collection<TableKey> collection, Duration duration) {
        if (str == null) {
            throw new NullPointerException("segmentName is marked @NonNull but is null");
        }
        if (collection == null) {
            throw new NullPointerException("keys is marked @NonNull but is null");
        }
        Exceptions.checkNotClosed(this.closed.get(), this);
        TimeoutTimer timeoutTimer = new TimeoutTimer(duration);
        Function function = tableKey -> {
            return tableKey;
        };
        EntrySerializer entrySerializer = this.serializer;
        entrySerializer.getClass();
        TableKeyBatch batch = batch(collection, function, entrySerializer::getRemovalLength, TableKeyBatch.removal());
        logRequest("remove", str, Boolean.valueOf(batch.isConditional()), Boolean.valueOf(batch.isRemoval()), Integer.valueOf(collection.size()), Integer.valueOf(batch.getLength()));
        return this.segmentContainer.forSegment(str, timeoutTimer.getRemaining()).thenComposeAsync(directSegmentAccess -> {
            return this.keyIndex.update(directSegmentAccess, batch, () -> {
                int length = batch.getLength();
                EntrySerializer entrySerializer2 = this.serializer;
                entrySerializer2.getClass();
                return commit(collection, length, entrySerializer2::serializeRemoval, directSegmentAccess, timeoutTimer.getRemaining());
            }, timeoutTimer);
        }, (Executor) this.executor).thenRun(Runnables.doNothing());
    }

    public CompletableFuture<List<TableEntry>> get(@NonNull String str, @NonNull List<ArrayView> list, Duration duration) {
        if (str == null) {
            throw new NullPointerException("segmentName is marked @NonNull but is null");
        }
        if (list == null) {
            throw new NullPointerException("keys is marked @NonNull but is null");
        }
        Exceptions.checkNotClosed(this.closed.get(), this);
        logRequest("get", str, Integer.valueOf(list.size()));
        if (list.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyList());
        }
        TimeoutTimer timeoutTimer = new TimeoutTimer(duration);
        GetResultBuilder getResultBuilder = new GetResultBuilder(list, this.hasher);
        return this.segmentContainer.forSegment(str, timeoutTimer.getRemaining()).thenComposeAsync(directSegmentAccess -> {
            return this.keyIndex.getBucketOffsets(directSegmentAccess, getResultBuilder.getHashes(), timeoutTimer).thenComposeAsync(map -> {
                return get(directSegmentAccess, getResultBuilder, map, timeoutTimer);
            }, (Executor) this.executor);
        }, (Executor) this.executor);
    }

    private CompletableFuture<List<TableEntry>> get(DirectSegmentAccess directSegmentAccess, GetResultBuilder getResultBuilder, Map<UUID, Long> map, TimeoutTimer timeoutTimer) {
        ContainerKeyIndex containerKeyIndex = this.keyIndex;
        containerKeyIndex.getClass();
        TableBucketReader<TableEntry> entry = TableBucketReader.entry(directSegmentAccess, containerKeyIndex::getBackpointerOffset, this.executor);
        int size = getResultBuilder.getHashes().size();
        for (int i = 0; i < size; i++) {
            UUID uuid = getResultBuilder.getHashes().get(i);
            long longValue = map.get(uuid).longValue();
            if (longValue == -1) {
                getResultBuilder.includeResult(CompletableFuture.completedFuture(null));
            } else {
                ArrayView arrayView = getResultBuilder.getKeys().get(i);
                getResultBuilder.includeResult(Futures.exceptionallyExpecting(entry.find(arrayView, longValue, timeoutTimer), th -> {
                    return th instanceof StreamSegmentTruncatedException;
                }, (Object) null).thenComposeAsync(tableEntry -> {
                    return tableEntry != null ? CompletableFuture.completedFuture(maybeDeleted(tableEntry)) : this.keyIndex.getBucketOffsetDirect(directSegmentAccess, uuid, timeoutTimer).thenComposeAsync(l -> {
                        return entry.find(arrayView, l.longValue(), timeoutTimer);
                    }, (Executor) this.executor).thenApply((Function<? super U, ? extends U>) this::maybeDeleted);
                }, (Executor) this.executor));
            }
        }
        return getResultBuilder.getResultFutures();
    }

    public CompletableFuture<AsyncIterator<IteratorItem<TableKey>>> keyIterator(String str, byte[] bArr, Duration duration) {
        logRequest("keyIterator", str);
        return newIterator(str, bArr, duration, (v0, v1, v2) -> {
            return TableBucketReader.key(v0, v1, v2);
        });
    }

    public CompletableFuture<AsyncIterator<IteratorItem<TableEntry>>> entryIterator(String str, byte[] bArr, Duration duration) {
        logRequest("entryIterator", str);
        return newIterator(str, bArr, duration, (v0, v1, v2) -> {
            return TableBucketReader.entry(v0, v1, v2);
        });
    }

    @VisibleForTesting
    protected int getMaxCompactionSize() {
        return DEFAULT_MAX_COMPACTION_SIZE;
    }

    private <T> TableKeyBatch batch(Collection<T> collection, Function<T, TableKey> function, Function<T, Integer> function2, TableKeyBatch tableKeyBatch) {
        for (T t : collection) {
            Integer apply = function2.apply(t);
            TableKey apply2 = function.apply(t);
            tableKeyBatch.add(apply2, this.hasher.hash(apply2.getKey()), apply.intValue());
        }
        Preconditions.checkArgument(tableKeyBatch.getLength() <= MAX_BATCH_SIZE, "Update Batch length (%s) exceeds the maximum limit.", MAX_BATCH_SIZE);
        return tableKeyBatch;
    }

    private <T> CompletableFuture<Long> commit(Collection<T> collection, int i, BiConsumer<Collection<T>, byte[]> biConsumer, DirectSegmentAccess directSegmentAccess, Duration duration) {
        if (!$assertionsDisabled && i > MAX_BATCH_SIZE) {
            throw new AssertionError();
        }
        byte[] bArr = new byte[i];
        biConsumer.accept(collection, bArr);
        return directSegmentAccess.append(new ByteArraySegment(bArr), null, duration);
    }

    private <T> CompletableFuture<AsyncIterator<IteratorItem<T>>> newIterator(@NonNull String str, byte[] bArr, @NonNull Duration duration, @NonNull GetBucketReader<T> getBucketReader) {
        UUID keyHash;
        if (str == null) {
            throw new NullPointerException("segmentName is marked @NonNull but is null");
        }
        if (duration == null) {
            throw new NullPointerException("fetchTimeout is marked @NonNull but is null");
        }
        if (getBucketReader == null) {
            throw new NullPointerException("createBucketReader is marked @NonNull but is null");
        }
        if (bArr == null) {
            keyHash = null;
        } else {
            try {
                keyHash = IteratorState.deserialize(bArr).getKeyHash();
            } catch (IOException e) {
                throw new IllegalDataFormatException("Unable to deserialize `serializedState`.", e);
            }
        }
        UUID nextHash = KeyHasher.getNextHash(keyHash);
        return nextHash == null ? CompletableFuture.completedFuture(TableIterator.empty()) : (CompletableFuture<AsyncIterator<IteratorItem<T>>>) this.segmentContainer.forSegment(str, duration).thenComposeAsync(directSegmentAccess -> {
            return buildIterator(directSegmentAccess, getBucketReader, nextHash, duration);
        }, (Executor) this.executor);
    }

    private <T> CompletableFuture<AsyncIterator<IteratorItem<T>>> buildIterator(DirectSegmentAccess directSegmentAccess, GetBucketReader<T> getBucketReader, UUID uuid, Duration duration) {
        ContainerKeyIndex containerKeyIndex = this.keyIndex;
        containerKeyIndex.getClass();
        TableBucketReader<T> apply = getBucketReader.apply(directSegmentAccess, containerKeyIndex::getBackpointerOffset, this.executor);
        TableIterator.ConvertResult convertResult = tableBucket -> {
            return apply.findAllExisting(tableBucket.getSegmentOffset(), new TimeoutTimer(duration)).thenApply(list -> {
                return new IteratorItemImpl(new IteratorState(tableBucket.getHash()), list);
            });
        };
        return (CompletableFuture<AsyncIterator<IteratorItem<T>>>) this.keyIndex.getUnindexedKeyHashes(directSegmentAccess).thenComposeAsync(map -> {
            return TableIterator.builder().segment(directSegmentAccess).cacheHashes(map).firstHash(uuid).executor(this.executor).resultConverter(convertResult).fetchTimeout(duration).build();
        }, (Executor) this.executor);
    }

    private TableEntry maybeDeleted(TableEntry tableEntry) {
        if (tableEntry == null || tableEntry.getValue() == null) {
            return null;
        }
        return tableEntry;
    }

    private void logRequest(String str, Object... objArr) {
        log.debug("{}: {} {}", new Object[]{this.traceObjectId, str, objArr});
    }

    static {
        $assertionsDisabled = !ContainerTableExtensionImpl.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(ContainerTableExtensionImpl.class);
    }
}
