package io.pravega.segmentstore.server.attributes;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.ObjectBuilder;
import io.pravega.common.TimeoutTimer;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.hash.HashHelper;
import io.pravega.common.io.serialization.RevisionDataInput;
import io.pravega.common.io.serialization.RevisionDataOutput;
import io.pravega.common.io.serialization.VersionedSerializer;
import io.pravega.common.util.ArrayView;
import io.pravega.common.util.CollectionHelpers;
import io.pravega.common.util.Retry;
import io.pravega.segmentstore.contracts.AttributeUpdate;
import io.pravega.segmentstore.contracts.AttributeUpdateType;
import io.pravega.segmentstore.contracts.Attributes;
import io.pravega.segmentstore.contracts.BadOffsetException;
import io.pravega.segmentstore.contracts.ReadResultEntry;
import io.pravega.segmentstore.contracts.ReadResultEntryContents;
import io.pravega.segmentstore.contracts.ReadResultEntryType;
import io.pravega.segmentstore.contracts.StreamSegmentMergedException;
import io.pravega.segmentstore.contracts.StreamSegmentNotExistsException;
import io.pravega.segmentstore.contracts.StreamSegmentSealedException;
import io.pravega.segmentstore.contracts.StreamSegmentTruncatedException;
import io.pravega.segmentstore.contracts.StreamingException;
import io.pravega.segmentstore.server.AttributeIndex;
import io.pravega.segmentstore.server.CacheManager;
import io.pravega.segmentstore.server.DataCorruptionException;
import io.pravega.segmentstore.server.OperationLog;
import io.pravega.segmentstore.server.SegmentMetadata;
import io.pravega.segmentstore.server.logs.operations.UpdateAttributesOperation;
import io.pravega.segmentstore.server.reading.AsyncReadResultHandler;
import io.pravega.segmentstore.server.reading.AsyncReadResultProcessor;
import io.pravega.segmentstore.server.reading.StreamSegmentStorageReader;
import io.pravega.segmentstore.storage.Cache;
import io.pravega.segmentstore.storage.ReadOnlyStorage;
import io.pravega.segmentstore.storage.SegmentHandle;
import io.pravega.segmentstore.storage.Storage;
import io.pravega.shared.segment.StreamSegmentNameUtils;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/pravega/segmentstore/server/attributes/SegmentAttributeIndex.class */
public class SegmentAttributeIndex implements AttributeIndex, CacheManager.Client, AutoCloseable {
    private static final int CACHE_BUCKETS = 64;
    private final SegmentMetadata segmentMetadata;
    private final Storage storage;
    private final OperationLog operationLog;
    private final Cache cache;

    @GuardedBy("cacheEntries")
    private int currentCacheGeneration;
    private final AttributeIndexConfig config;
    private final ScheduledExecutorService executor;
    private final String traceObjectId;

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(SegmentAttributeIndex.class);
    private static final Retry.RetryAndThrowBase<Exception> APPEND_RETRY = Retry.withExpBackoff(10, 2, 10, 1000).retryingOn(BadOffsetException.class).throwingOn(Exception.class);
    private static final Retry.RetryAndThrowBase<Exception> READ_RETRY = Retry.withExpBackoff(10, 2, 10, 1000).retryingOn(StreamSegmentTruncatedException.class).throwingOn(Exception.class);
    private static final HashHelper HASH = HashHelper.seededWith(SegmentAttributeIndex.class.getName());
    private final AtomicReference<AttributeSegment> attributeSegment = new AtomicReference<>();

    @GuardedBy("cacheEntries")
    private final CacheEntry[] cacheEntries = new CacheEntry[CACHE_BUCKETS];
    private final AtomicBoolean closed = new AtomicBoolean();

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:io/pravega/segmentstore/server/attributes/SegmentAttributeIndex$AttributeCollection.class */
    public static class AttributeCollection {
        private static final AttributeCollectionSerializer SERIALIZER = new AttributeCollectionSerializer();
        private final Map<UUID, Long> attributes;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:io/pravega/segmentstore/server/attributes/SegmentAttributeIndex$AttributeCollection$AttributeCollectionBuilder.class */
        public static class AttributeCollectionBuilder implements ObjectBuilder<AttributeCollection> {

            @SuppressFBWarnings(justification = "generated code")
            private Map<UUID, Long> attributes;

            @SuppressFBWarnings(justification = "generated code")
            AttributeCollectionBuilder() {
            }

            @SuppressFBWarnings(justification = "generated code")
            public AttributeCollectionBuilder attributes(Map<UUID, Long> map) {
                this.attributes = map;
                return this;
            }

            @SuppressFBWarnings(justification = "generated code")
            /* renamed from: build, reason: merged with bridge method [inline-methods] */
            public AttributeCollection m11build() {
                return new AttributeCollection(this.attributes);
            }

            @SuppressFBWarnings(justification = "generated code")
            public String toString() {
                return "SegmentAttributeIndex.AttributeCollection.AttributeCollectionBuilder(attributes=" + this.attributes + ")";
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:io/pravega/segmentstore/server/attributes/SegmentAttributeIndex$AttributeCollection$AttributeCollectionSerializer.class */
        public static class AttributeCollectionSerializer extends VersionedSerializer.WithBuilder<AttributeCollection, AttributeCollectionBuilder> {
            AttributeCollectionSerializer() {
            }

            /* JADX INFO: Access modifiers changed from: protected */
            /* renamed from: newBuilder, reason: merged with bridge method [inline-methods] */
            public AttributeCollectionBuilder m12newBuilder() {
                return AttributeCollection.builder();
            }

            protected byte getWriteVersion() {
                return (byte) 0;
            }

            protected void declareVersions() {
                version(0).revision(0, this::write00, this::read00);
            }

            private void write00(AttributeCollection attributeCollection, RevisionDataOutput revisionDataOutput) throws IOException {
                if (revisionDataOutput.requiresExplicitLength()) {
                    revisionDataOutput.length(revisionDataOutput.getMapLength(attributeCollection.attributes.size(), 16, 8));
                }
                revisionDataOutput.writeMap(attributeCollection.attributes, (v0, v1) -> {
                    v0.writeUUID(v1);
                }, (v0, v1) -> {
                    v0.writeLong(v1);
                });
            }

            private void read00(RevisionDataInput revisionDataInput, AttributeCollectionBuilder attributeCollectionBuilder) throws IOException {
                attributeCollectionBuilder.attributes(revisionDataInput.readMap((v0) -> {
                    return v0.readUUID();
                }, (v0) -> {
                    return v0.readLong();
                }, HashMap::new));
            }
        }

        private AttributeCollection() {
            this.attributes = new HashMap();
        }

        private AttributeCollection(Map<UUID, Long> map) {
            this.attributes = map == null ? new HashMap<>() : map;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void mergeWith(AttributeCollection attributeCollection) {
            attributeCollection.attributes.forEach((uuid, l) -> {
                if (l.longValue() == Long.MIN_VALUE) {
                    this.attributes.remove(uuid);
                } else {
                    this.attributes.put(uuid, l);
                }
            });
        }

        @SuppressFBWarnings(justification = "generated code")
        public static AttributeCollectionBuilder builder() {
            return new AttributeCollectionBuilder();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/segmentstore/server/attributes/SegmentAttributeIndex$AttributeSegment.class */
    public static class AttributeSegment {
        private final SegmentHandle handle;
        private final AtomicLong length;

        AttributeSegment(SegmentHandle segmentHandle, long j) {
            this.handle = segmentHandle;
            this.length = new AtomicLong(j);
        }

        long getLength() {
            return this.length.get();
        }

        void setLength(long j) {
            this.length.set(j);
        }

        void increaseLength(int i) {
            Preconditions.checkArgument(i >= 0, "increase must be non-negative");
            this.length.addAndGet(i);
        }
    }

    @NotThreadSafe
    /* loaded from: input_file:io/pravega/segmentstore/server/attributes/SegmentAttributeIndex$AttributeSegmentReader.class */
    private static class AttributeSegmentReader implements AsyncReadResultHandler {
        private final CompletableFuture<AttributeCollection> result;
        private final TimeoutTimer timer;
        static final /* synthetic */ boolean $assertionsDisabled;
        private final ArrayList<InputStream> inputs = new ArrayList<>();
        private final AttributeCollection attributeCollection = new AttributeCollection(null);

        AttributeSegmentReader(CompletableFuture<AttributeCollection> completableFuture, Duration duration) {
            this.result = completableFuture;
            this.timer = new TimeoutTimer(duration);
        }

        @Override // io.pravega.segmentstore.server.reading.AsyncReadResultHandler
        public boolean shouldRequestContents(ReadResultEntryType readResultEntryType, long j) {
            return true;
        }

        @Override // io.pravega.segmentstore.server.reading.AsyncReadResultHandler
        public boolean processEntry(ReadResultEntry readResultEntry) {
            if (!$assertionsDisabled && !readResultEntry.getContent().isDone()) {
                throw new AssertionError("received incomplete ReadResultEntry from reader");
            }
            this.inputs.add(((ReadResultEntryContents) readResultEntry.getContent().join()).getData());
            return true;
        }

        @Override // io.pravega.segmentstore.server.reading.AsyncReadResultHandler
        public void processError(Throwable th) {
            this.result.completeExceptionally(th);
        }

        @Override // io.pravega.segmentstore.server.reading.AsyncReadResultHandler
        public void processResultComplete() {
            Enumeration enumeration = Collections.enumeration(this.inputs);
            try {
                SequenceInputStream sequenceInputStream = new SequenceInputStream(enumeration);
                Throwable th = null;
                while (true) {
                    try {
                        if (!enumeration.hasMoreElements() && sequenceInputStream.available() <= 0) {
                            break;
                        }
                        this.attributeCollection.mergeWith((AttributeCollection) AttributeCollection.SERIALIZER.deserialize(sequenceInputStream));
                    } finally {
                    }
                }
                this.result.complete(this.attributeCollection);
                if (sequenceInputStream != null) {
                    if (0 != 0) {
                        try {
                            sequenceInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        sequenceInputStream.close();
                    }
                }
            } catch (Throwable th3) {
                processError(th3);
            }
        }

        @Override // io.pravega.segmentstore.server.reading.AsyncReadResultHandler
        public Duration getRequestContentTimeout() {
            return this.timer.getRemaining();
        }

        static {
            $assertionsDisabled = !SegmentAttributeIndex.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/segmentstore/server/attributes/SegmentAttributeIndex$CacheEntry.class */
    public class CacheEntry {
        private final int entryId;

        @GuardedBy("this")
        private int generation;

        @GuardedBy("this")
        private int size = 0;

        CacheEntry(int i, int i2) {
            this.entryId = i;
            this.generation = i2;
        }

        CacheKey getKey() {
            return new CacheKey(SegmentAttributeIndex.this.segmentMetadata.getId(), this.entryId);
        }

        synchronized int getGeneration() {
            return this.generation;
        }

        synchronized int getSize() {
            return this.size;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public synchronized void fetchValues(Collection<UUID> collection, Map<UUID, Long> map, int i) {
            byte[] bArr;
            if (!collection.isEmpty() && (bArr = SegmentAttributeIndex.this.cache.get(getKey())) != null && bArr.length > 0 && CollectionHelpers.binarySearch(CacheEntryLayout.wrap(bArr), collection, map)) {
                this.generation = i;
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public synchronized void updateValues(Collection<Map.Entry<UUID, Long>> collection, long j, int i) {
            if (collection.isEmpty()) {
                return;
            }
            byte[] bArr = SegmentAttributeIndex.this.cache.get(getKey());
            Map<UUID, VersionedValue> hashMap = bArr == null ? new HashMap<>() : CacheEntryLayout.wrap(bArr).getAllValues();
            boolean z = false;
            for (Map.Entry<UUID, Long> entry : collection) {
                VersionedValue orDefault = hashMap.getOrDefault(entry.getKey(), null);
                if (orDefault == null || orDefault.version < j) {
                    if (entry.getValue().longValue() == Long.MIN_VALUE) {
                        hashMap.remove(entry.getKey());
                        z |= orDefault != null;
                    } else {
                        hashMap.put(entry.getKey(), new VersionedValue(j, entry.getValue().longValue()));
                        z |= orDefault == null || orDefault.value != entry.getValue().longValue();
                    }
                }
            }
            if (z) {
                byte[] values = CacheEntryLayout.setValues(bArr, hashMap.entrySet().stream().sorted(Comparator.comparing((v0) -> {
                    return v0.getKey();
                })).iterator(), hashMap.size());
                SegmentAttributeIndex.this.cache.insert(getKey(), values);
                this.size = values.length;
                this.generation = i;
            }
        }

        synchronized void clear() {
            SegmentAttributeIndex.this.cache.remove(getKey());
            this.size = 0;
        }

        @SuppressFBWarnings(justification = "generated code")
        public int getEntryId() {
            return this.entryId;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:io/pravega/segmentstore/server/attributes/SegmentAttributeIndex$WriteInfo.class */
    public static class WriteInfo {
        private final long offset;
        private final int length;

        long getEndOffset() {
            return this.offset + this.length;
        }

        public String toString() {
            return String.format("Offset=%d, Length=%d", Long.valueOf(this.offset), Integer.valueOf(this.length));
        }

        @SuppressFBWarnings(justification = "generated code")
        @ConstructorProperties({"offset", "length"})
        public WriteInfo(long j, int i) {
            this.offset = j;
            this.length = i;
        }

        @SuppressFBWarnings(justification = "generated code")
        public long getOffset() {
            return this.offset;
        }

        @SuppressFBWarnings(justification = "generated code")
        public int getLength() {
            return this.length;
        }

        @SuppressFBWarnings(justification = "generated code")
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof WriteInfo)) {
                return false;
            }
            WriteInfo writeInfo = (WriteInfo) obj;
            return writeInfo.canEqual(this) && getOffset() == writeInfo.getOffset() && getLength() == writeInfo.getLength();
        }

        @SuppressFBWarnings(justification = "generated code")
        protected boolean canEqual(Object obj) {
            return obj instanceof WriteInfo;
        }

        @SuppressFBWarnings(justification = "generated code")
        public int hashCode() {
            long offset = getOffset();
            return (((1 * 59) + ((int) ((offset >>> 32) ^ offset))) * 59) + getLength();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SegmentAttributeIndex(SegmentMetadata segmentMetadata, Storage storage, OperationLog operationLog, Cache cache, AttributeIndexConfig attributeIndexConfig, ScheduledExecutorService scheduledExecutorService) {
        this.segmentMetadata = (SegmentMetadata) Preconditions.checkNotNull(segmentMetadata, "segmentMetadata");
        this.storage = (Storage) Preconditions.checkNotNull(storage, "storage");
        this.operationLog = (OperationLog) Preconditions.checkNotNull(operationLog, "operationLog");
        this.cache = (Cache) Preconditions.checkNotNull(cache, "cache");
        this.config = (AttributeIndexConfig) Preconditions.checkNotNull(attributeIndexConfig, "config");
        this.executor = (ScheduledExecutorService) Preconditions.checkNotNull(scheduledExecutorService, "executor");
        this.traceObjectId = String.format("AttributeIndex[%s]", Long.valueOf(this.segmentMetadata.getId()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> initialize(Duration duration) {
        TimeoutTimer timeoutTimer = new TimeoutTimer(duration);
        String attributeSegmentName = StreamSegmentNameUtils.getAttributeSegmentName(this.segmentMetadata.getName());
        Preconditions.checkState(this.attributeSegment.get() == null, "SegmentAttributeIndex is already initialized.");
        return Futures.exceptionallyComposeExpecting(this.storage.openWrite(attributeSegmentName).thenComposeAsync(segmentHandle -> {
            return this.storage.getStreamSegmentInfo(attributeSegmentName, timeoutTimer.getRemaining()).thenAccept(segmentProperties -> {
                this.attributeSegment.set(new AttributeSegment(segmentHandle, segmentProperties.getLength()));
            });
        }, (Executor) this.executor), th -> {
            return th instanceof StreamSegmentNotExistsException;
        }, () -> {
            return this.storage.create(attributeSegmentName, this.config.getAttributeSegmentRollingPolicy(), timeoutTimer.getRemaining()).thenComposeAsync(segmentProperties -> {
                return this.storage.openWrite(attributeSegmentName).thenAccept(segmentHandle2 -> {
                    this.attributeSegment.set(new AttributeSegment(segmentHandle2, segmentProperties.getLength()));
                });
            }, (Executor) this.executor);
        }).thenRun(() -> {
            log.debug("{}: Initialized (Attribute Segment Length = {}).", this.traceObjectId, Long.valueOf(this.attributeSegment.get().getLength()));
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static CompletableFuture<Void> delete(String str, Storage storage, Duration duration) {
        TimeoutTimer timeoutTimer = new TimeoutTimer(duration);
        return Futures.exceptionallyExpecting(storage.openWrite(StreamSegmentNameUtils.getAttributeSegmentName(str)).thenCompose(segmentHandle -> {
            return storage.delete(segmentHandle, timeoutTimer.getRemaining());
        }), th -> {
            return th instanceof StreamSegmentNotExistsException;
        }, (Object) null);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        close(false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close(boolean z) {
        if (this.closed.getAndSet(true)) {
            return;
        }
        if (z) {
            this.executor.execute(() -> {
                removeAllCacheEntries();
                log.info("{}: Closed.", this.traceObjectId);
            });
        } else {
            log.info("{}: Closed (no cache cleanup).", this.traceObjectId);
        }
    }

    @VisibleForTesting
    void removeAllCacheEntries() {
        List list;
        synchronized (this.cacheEntries) {
            list = (List) Arrays.stream(this.cacheEntries).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList());
            Arrays.fill(this.cacheEntries, (Object) null);
        }
        list.forEach((v0) -> {
            v0.clear();
        });
        log.info("{}: Cleared all cache entries ({}).", this.traceObjectId, Integer.valueOf(list.size()));
    }

    @Override // io.pravega.segmentstore.server.CacheManager.Client
    public CacheManager.CacheStatus getCacheStatus() {
        int i = 0;
        int i2 = 0;
        long j = 0;
        synchronized (this.cacheEntries) {
            for (CacheEntry cacheEntry : this.cacheEntries) {
                if (cacheEntry != null) {
                    int generation = cacheEntry.getGeneration();
                    i = Math.min(i, generation);
                    i2 = Math.max(i2, generation);
                    j += cacheEntry.getSize();
                }
            }
        }
        return new CacheManager.CacheStatus(j, i, i2);
    }

    @Override // io.pravega.segmentstore.server.CacheManager.Client
    public long updateGenerations(int i, int i2) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        long j = 0;
        synchronized (this.cacheEntries) {
            this.currentCacheGeneration = i;
            for (int i3 = 0; i3 < this.cacheEntries.length; i3++) {
                CacheEntry cacheEntry = this.cacheEntries[i3];
                if (cacheEntry != null && cacheEntry.getGeneration() < i2) {
                    this.cache.remove(cacheEntry.getKey());
                    j += cacheEntry.getSize();
                    this.cacheEntries[i3] = null;
                }
            }
        }
        return j;
    }

    public CompletableFuture<Void> put(UUID uuid, Long l, Duration duration) {
        return put(Collections.singletonMap(uuid, l), duration);
    }

    @Override // io.pravega.segmentstore.server.AttributeIndex
    public CompletableFuture<Void> put(Map<UUID, Long> map, Duration duration) {
        ensureInitialized();
        Preconditions.checkNotNull(map, "values");
        if (map.size() == 0) {
            return CompletableFuture.completedFuture(null);
        }
        AttributeCollection attributeCollection = new AttributeCollection(map);
        return (shouldSnapshot() ? createSnapshot(attributeCollection, false, duration) : appendConditionally(() -> {
            return CompletableFuture.completedFuture(serialize(attributeCollection));
        }, new TimeoutTimer(duration))).thenAcceptAsync(writeInfo -> {
            updateCache(attributeCollection, writeInfo.getEndOffset());
        }, (Executor) this.executor);
    }

    @Override // io.pravega.segmentstore.server.AttributeIndex
    public CompletableFuture<Map<UUID, Long>> get(Collection<UUID> collection, Duration duration) {
        ensureInitialized();
        if (collection.size() == 0) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        Map<UUID, Long> fromCache = getFromCache(collection);
        return fromCache.size() == collection.size() ? CompletableFuture.completedFuture(fromCache) : readAllSinceLastSnapshot(true, duration).thenApply(attributeCollection -> {
            ImmutableMap.Builder builder = ImmutableMap.builder();
            collection.forEach(uuid -> {
                long longValue = ((Long) attributeCollection.attributes.getOrDefault(uuid, Long.MIN_VALUE)).longValue();
                if (longValue != Long.MIN_VALUE) {
                    builder.put(uuid, Long.valueOf(longValue));
                }
            });
            return builder.build();
        });
    }

    public CompletableFuture<Long> get(UUID uuid, Duration duration) {
        ensureInitialized();
        Map<UUID, Long> fromCache = getFromCache(Collections.singleton(uuid));
        return !fromCache.isEmpty() ? CompletableFuture.completedFuture(fromCache.get(uuid)) : readAllSinceLastSnapshot(true, duration).thenApply(attributeCollection -> {
            return (Long) attributeCollection.attributes.get(uuid);
        });
    }

    public CompletableFuture<Void> remove(UUID uuid, Duration duration) {
        return remove((Collection<UUID>) Collections.singleton(uuid), duration);
    }

    @Override // io.pravega.segmentstore.server.AttributeIndex
    public CompletableFuture<Void> remove(Collection<UUID> collection, Duration duration) {
        Preconditions.checkNotNull(collection, "keys");
        return put((Map) collection.stream().collect(Collectors.toMap(uuid -> {
            return uuid;
        }, uuid2 -> {
            return Long.MIN_VALUE;
        })), duration);
    }

    @Override // io.pravega.segmentstore.server.AttributeIndex
    public CompletableFuture<Void> seal(Duration duration) {
        ensureInitialized();
        TimeoutTimer timeoutTimer = new TimeoutTimer(duration);
        return Futures.exceptionallyExpecting(createSnapshot(new AttributeCollection(), true, timeoutTimer.getRemaining()).thenComposeAsync(writeInfo -> {
            return this.storage.seal(this.attributeSegment.get().handle, timeoutTimer.getRemaining());
        }, (Executor) this.executor).thenRun(() -> {
            log.info("{}: Sealed (Length = {}).", this.traceObjectId, Long.valueOf(this.attributeSegment.get().getLength()));
        }), th -> {
            return th instanceof StreamSegmentSealedException;
        }, (Object) null);
    }

    @VisibleForTesting
    boolean shouldSnapshot() {
        return this.attributeSegment.get().getLength() - (getLastSnapshotOffset() + ((long) getLastSnapshotLength())) >= ((long) this.config.getSnapshotTriggerSize());
    }

    private CompletableFuture<AttributeCollection> readAllSinceLastSnapshot(boolean z, Duration duration) {
        AtomicLong atomicLong = new AtomicLong();
        CompletableFuture runAsync = READ_RETRY.runAsync(() -> {
            ensureMainSegmentExists();
            long lastSnapshotOffset = getLastSnapshotOffset();
            int min = (int) Math.min(2147483647L, this.attributeSegment.get().getLength() - lastSnapshotOffset);
            atomicLong.set(lastSnapshotOffset + min);
            CompletableFuture completableFuture = new CompletableFuture();
            if (min == 0) {
                completableFuture.complete(new AttributeCollection());
            } else {
                AsyncReadResultProcessor.process(StreamSegmentStorageReader.read(this.attributeSegment.get().handle, lastSnapshotOffset, min, this.config.getReadBlockSize(), (ReadOnlyStorage) this.storage), new AttributeSegmentReader(completableFuture, duration), this.executor);
            }
            return completableFuture;
        }, this.executor);
        if (z) {
            runAsync = runAsync.thenApplyAsync(attributeCollection -> {
                updateCache(attributeCollection, atomicLong.get());
                return attributeCollection;
            }, (Executor) this.executor);
        }
        return Futures.exceptionallyCompose(runAsync, th -> {
            StreamingException unwrap = Exceptions.unwrap(th);
            if ((unwrap instanceof IOException) || (unwrap instanceof StreamSegmentTruncatedException)) {
                unwrap = new DataCorruptionException(String.format("Unable to parse AttributeSegment. LastSnapshot = (Offset=%d, Length=%d), Known Segment Length = %d.", Long.valueOf(getLastSnapshotOffset()), Integer.valueOf(getLastSnapshotLength()), Long.valueOf(this.attributeSegment.get().getLength())), unwrap, new Object[0]);
            }
            return Futures.failedFuture(unwrap);
        });
    }

    private CompletableFuture<WriteInfo> createSnapshot(AttributeCollection attributeCollection, boolean z, Duration duration) {
        TimeoutTimer timeoutTimer = new TimeoutTimer(duration);
        return appendConditionally(() -> {
            return readAllSinceLastSnapshot(false, timeoutTimer.getRemaining()).thenApplyAsync(attributeCollection2 -> {
                attributeCollection2.mergeWith(attributeCollection);
                return serialize(attributeCollection2);
            }, (Executor) this.executor);
        }, timeoutTimer).thenComposeAsync(writeInfo -> {
            return updateStatePostSnapshot(writeInfo, z, timeoutTimer);
        }, (Executor) this.executor);
    }

    private CompletableFuture<WriteInfo> updateStatePostSnapshot(WriteInfo writeInfo, boolean z, TimeoutTimer timeoutTimer) {
        log.debug("{}: Snapshot serialized to attribute segment ({}).", this.traceObjectId, writeInfo);
        CompletableFuture thenCompose = this.operationLog.add(new UpdateAttributesOperation(this.segmentMetadata.getId(), Arrays.asList(new AttributeUpdate(Attributes.LAST_ATTRIBUTE_SNAPSHOT_OFFSET, AttributeUpdateType.ReplaceIfGreater, writeInfo.offset), new AttributeUpdate(Attributes.LAST_ATTRIBUTE_SNAPSHOT_LENGTH, AttributeUpdateType.Replace, writeInfo.length))), timeoutTimer.getRemaining()).handleAsync((r9, th) -> {
            if (th == null) {
                log.debug("{}: Snapshot location updated in main segment's metadata ({}).", this.traceObjectId, writeInfo);
                ensureMainSegmentExists();
                return this.storage.truncate(this.attributeSegment.get().handle, writeInfo.offset, timeoutTimer.getRemaining());
            }
            Throwable unwrap = Exceptions.unwrap(th);
            if (!(unwrap instanceof StreamSegmentMergedException) && !(unwrap instanceof StreamSegmentSealedException) && !(unwrap instanceof StreamSegmentNotExistsException)) {
                return Futures.failedFuture(unwrap);
            }
            log.warn("{}: Snapshot serialized to attribute segment, but failed to update snapshot location due to {}.", this.traceObjectId, unwrap.toString());
            return CompletableFuture.completedFuture(null);
        }, (Executor) this.executor).thenCompose((Function<? super U, ? extends CompletionStage<U>>) completableFuture -> {
            return completableFuture;
        });
        if (!z) {
            thenCompose = thenCompose.exceptionally(th2 -> {
                log.warn("{}: Snapshot serialized to attribute segment, but failed to update snapshot location or truncate Attribute Segment.", this.traceObjectId, Exceptions.unwrap(th2));
                return null;
            });
        }
        return thenCompose.thenApply(r3 -> {
            return writeInfo;
        });
    }

    private CompletableFuture<WriteInfo> appendConditionally(Supplier<CompletableFuture<ArrayView>> supplier, TimeoutTimer timeoutTimer) {
        return APPEND_RETRY.runAsync(() -> {
            return appendConditionallyOnce(supplier, timeoutTimer);
        }, this.executor);
    }

    private CompletableFuture<WriteInfo> appendConditionallyOnce(Supplier<CompletableFuture<ArrayView>> supplier, TimeoutTimer timeoutTimer) {
        AttributeSegment attributeSegment = this.attributeSegment.get();
        ensureMainSegmentExists();
        long length = attributeSegment.getLength();
        return supplier.get().thenComposeAsync(arrayView -> {
            return Futures.exceptionallyCompose(this.storage.write(attributeSegment.handle, length, arrayView.getReader(), arrayView.getLength(), timeoutTimer.getRemaining()).thenApply(r11 -> {
                attributeSegment.increaseLength(arrayView.getLength());
                log.debug("{}: Wrote data ({}).", this.traceObjectId, Integer.valueOf(arrayView.getLength()));
                return new WriteInfo(length, arrayView.getLength());
            }), th -> {
                return Exceptions.unwrap(th) instanceof BadOffsetException ? this.storage.getStreamSegmentInfo(this.attributeSegment.get().handle.getSegmentName(), timeoutTimer.getRemaining()).thenCompose(segmentProperties -> {
                    attributeSegment.setLength(segmentProperties.getLength());
                    return Futures.failedFuture(th);
                }) : Futures.failedFuture(th);
            });
        });
    }

    @VisibleForTesting
    ArrayView serialize(AttributeCollection attributeCollection) {
        try {
            return AttributeCollection.SERIALIZER.serialize(attributeCollection);
        } catch (IOException e) {
            throw e;
        }
    }

    @VisibleForTesting
    SegmentHandle getAttributeSegmentHandle() {
        return this.attributeSegment.get().handle;
    }

    private long getLastSnapshotOffset() {
        return ((Long) this.segmentMetadata.getAttributes().getOrDefault(Attributes.LAST_ATTRIBUTE_SNAPSHOT_OFFSET, 0L)).longValue();
    }

    private int getLastSnapshotLength() {
        return (int) ((Long) this.segmentMetadata.getAttributes().getOrDefault(Attributes.LAST_ATTRIBUTE_SNAPSHOT_LENGTH, 0L)).longValue();
    }

    private void ensureMainSegmentExists() {
        try {
            if (this.segmentMetadata.isDeleted() || this.segmentMetadata.isMerged()) {
                log.info("{}: Main Segment ({}) is Deleted. Aborting operation.", this.traceObjectId, this.segmentMetadata.getName());
                throw new StreamSegmentNotExistsException(this.segmentMetadata.getName());
            }
        } catch (StreamSegmentNotExistsException e) {
            throw e;
        }
    }

    private void updateCache(AttributeCollection attributeCollection, long j) {
        int i;
        if (attributeCollection.attributes.isEmpty()) {
            return;
        }
        Map<Integer, List<Map.Entry<UUID, Long>>> hash = hash(attributeCollection.attributes);
        HashMap hashMap = new HashMap();
        synchronized (this.cacheEntries) {
            i = this.currentCacheGeneration;
            hash.forEach((num, list) -> {
                CacheEntry cacheEntry = this.cacheEntries[num.intValue()];
                if (cacheEntry == null) {
                    cacheEntry = new CacheEntry(num.intValue(), i);
                    this.cacheEntries[num.intValue()] = cacheEntry;
                }
                hashMap.put(cacheEntry, list);
            });
        }
        hashMap.forEach((cacheEntry, list2) -> {
            cacheEntry.updateValues(list2, j, i);
        });
    }

    private Map<UUID, Long> getFromCache(Collection<UUID> collection) {
        int i;
        if (collection.isEmpty()) {
            return Collections.emptyMap();
        }
        Map<Integer, List<UUID>> hash = hash(collection);
        HashMap hashMap = new HashMap();
        synchronized (this.cacheEntries) {
            i = this.currentCacheGeneration;
            hash.forEach((num, list) -> {
                CacheEntry cacheEntry = this.cacheEntries[num.intValue()];
                if (cacheEntry != null) {
                    hashMap.put(cacheEntry, list);
                }
            });
        }
        HashMap hashMap2 = new HashMap();
        hashMap.forEach((cacheEntry, list2) -> {
            cacheEntry.fetchValues(list2, hashMap2, i);
        });
        return hashMap2;
    }

    private Map<Integer, List<Map.Entry<UUID, Long>>> hash(Map<UUID, Long> map) {
        return (Map) map.entrySet().stream().collect(Collectors.groupingBy(entry -> {
            return Integer.valueOf(HASH.hashToBucket((UUID) entry.getKey(), CACHE_BUCKETS));
        }));
    }

    private Map<Integer, List<UUID>> hash(Collection<UUID> collection) {
        return (Map) collection.stream().collect(Collectors.groupingBy(uuid -> {
            return Integer.valueOf(HASH.hashToBucket(uuid, CACHE_BUCKETS));
        }));
    }

    @VisibleForTesting
    Map<UUID, Integer> getBuckets(Collection<UUID> collection) {
        return (Map) collection.stream().collect(Collectors.toMap(uuid -> {
            return uuid;
        }, uuid2 -> {
            return Integer.valueOf(HASH.hashToBucket(uuid2, CACHE_BUCKETS));
        }));
    }

    private void ensureInitialized() {
        Preconditions.checkState(this.attributeSegment.get() != null, "SegmentAttributeIndex is not initialized.");
    }
}
