package io.pravega.segmentstore.server.containers;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.ExceptionHelpers;
import io.pravega.common.Exceptions;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.TimeoutTimer;
import io.pravega.common.concurrent.FutureHelpers;
import io.pravega.common.util.AsyncMap;
import io.pravega.segmentstore.contracts.AttributeUpdate;
import io.pravega.segmentstore.contracts.SegmentProperties;
import io.pravega.segmentstore.contracts.StreamSegmentExistsException;
import io.pravega.segmentstore.contracts.StreamSegmentInformation;
import io.pravega.segmentstore.contracts.StreamSegmentNotExistsException;
import io.pravega.segmentstore.contracts.TooManyActiveSegmentsException;
import io.pravega.segmentstore.server.ContainerMetadata;
import io.pravega.segmentstore.server.DataCorruptionException;
import io.pravega.segmentstore.server.OperationLog;
import io.pravega.segmentstore.server.SegmentMetadata;
import io.pravega.segmentstore.server.logs.operations.StreamSegmentMapOperation;
import io.pravega.segmentstore.server.logs.operations.StreamSegmentMapping;
import io.pravega.segmentstore.server.logs.operations.TransactionMapOperation;
import io.pravega.segmentstore.storage.Storage;
import io.pravega.shared.segment.StreamSegmentNameUtils;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:io/pravega/segmentstore/server/containers/StreamSegmentMapper.class */
public class StreamSegmentMapper {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log;
    private final String traceObjectId;
    private final ContainerMetadata containerMetadata;
    private final OperationLog durableLog;
    private final AsyncMap<String, SegmentState> stateStore;
    private final Supplier<CompletableFuture<Void>> metadataCleanup;
    private final Storage storage;
    private final Executor executor;

    @GuardedBy("assignmentLock")
    private final HashMap<String, PendingRequest> pendingRequests;
    private final Object assignmentLock = new Object();
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    @NotThreadSafe
    /* loaded from: input_file:io/pravega/segmentstore/server/containers/StreamSegmentMapper$PendingRequest.class */
    public static class PendingRequest {
        private final ArrayList<QueuedCallback<?>> callbacks;

        private PendingRequest() {
            this.callbacks = new ArrayList<>();
        }

        void complete(long j) {
            Iterator<QueuedCallback<?>> it = this.callbacks.iterator();
            while (it.hasNext()) {
                QueuedCallback<?> next = it.next();
                try {
                    next.complete(j);
                } catch (Throwable th) {
                    next.completeExceptionally(th);
                }
            }
        }

        void completeExceptionally(Throwable th) {
            Iterator<QueuedCallback<?>> it = this.callbacks.iterator();
            while (it.hasNext()) {
                it.next().completeExceptionally(th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/segmentstore/server/containers/StreamSegmentMapper$QueuedCallback.class */
    public static class QueuedCallback<T> {
        final CompletableFuture<T> result = new CompletableFuture<>();
        final Function<Long, CompletableFuture<T>> callback;

        void complete(long j) {
            FutureHelpers.completeAfter(() -> {
                return this.callback.apply(Long.valueOf(j));
            }, this.result);
        }

        void completeExceptionally(Throwable th) {
            this.result.completeExceptionally(th);
        }

        @SuppressFBWarnings(justification = "generated code")
        @ConstructorProperties({"callback"})
        public QueuedCallback(Function<Long, CompletableFuture<T>> function) {
            this.callback = function;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/segmentstore/server/containers/StreamSegmentMapper$SegmentInfo.class */
    public static class SegmentInfo {
        private final long segmentId;
        private final SegmentProperties properties;

        @SuppressFBWarnings(justification = "generated code")
        @ConstructorProperties({"segmentId", "properties"})
        public SegmentInfo(long j, SegmentProperties segmentProperties) {
            this.segmentId = j;
            this.properties = segmentProperties;
        }

        @SuppressFBWarnings(justification = "generated code")
        public long getSegmentId() {
            return this.segmentId;
        }

        @SuppressFBWarnings(justification = "generated code")
        public SegmentProperties getProperties() {
            return this.properties;
        }

        @SuppressFBWarnings(justification = "generated code")
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof SegmentInfo)) {
                return false;
            }
            SegmentInfo segmentInfo = (SegmentInfo) obj;
            if (!segmentInfo.canEqual(this) || getSegmentId() != segmentInfo.getSegmentId()) {
                return false;
            }
            SegmentProperties properties = getProperties();
            SegmentProperties properties2 = segmentInfo.getProperties();
            return properties == null ? properties2 == null : properties.equals(properties2);
        }

        @SuppressFBWarnings(justification = "generated code")
        protected boolean canEqual(Object obj) {
            return obj instanceof SegmentInfo;
        }

        @SuppressFBWarnings(justification = "generated code")
        public int hashCode() {
            long segmentId = getSegmentId();
            int i = (1 * 59) + ((int) ((segmentId >>> 32) ^ segmentId));
            SegmentProperties properties = getProperties();
            return (i * 59) + (properties == null ? 43 : properties.hashCode());
        }

        @SuppressFBWarnings(justification = "generated code")
        public String toString() {
            return "StreamSegmentMapper.SegmentInfo(segmentId=" + getSegmentId() + ", properties=" + getProperties() + ")";
        }
    }

    public StreamSegmentMapper(ContainerMetadata containerMetadata, OperationLog operationLog, AsyncMap<String, SegmentState> asyncMap, Supplier<CompletableFuture<Void>> supplier, Storage storage, Executor executor) {
        Preconditions.checkNotNull(containerMetadata, "containerMetadata");
        Preconditions.checkNotNull(operationLog, "durableLog");
        Preconditions.checkNotNull(asyncMap, "stateStore");
        Preconditions.checkNotNull(supplier, "metadataCleanup");
        Preconditions.checkNotNull(storage, "storage");
        Preconditions.checkNotNull(executor, "executor");
        this.traceObjectId = String.format("StreamSegmentMapper[%d]", Integer.valueOf(containerMetadata.getContainerId()));
        this.containerMetadata = containerMetadata;
        this.durableLog = operationLog;
        this.stateStore = asyncMap;
        this.metadataCleanup = supplier;
        this.storage = storage;
        this.executor = executor;
        this.pendingRequests = new HashMap<>();
    }

    public CompletableFuture<Void> createNewStreamSegment(String str, Collection<AttributeUpdate> collection, Duration duration) {
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "createNewStreamSegment", new Object[]{str});
        if (isValidStreamSegmentId(this.containerMetadata.getStreamSegmentId(str, true))) {
            return FutureHelpers.failedFuture(new StreamSegmentExistsException(str));
        }
        CompletableFuture<Void> createSegmentInStorageWithRecovery = createSegmentInStorageWithRecovery(str, collection, new TimeoutTimer(duration));
        if (log.isTraceEnabled()) {
            createSegmentInStorageWithRecovery.thenAccept(r14 -> {
                LoggerHelpers.traceLeave(log, this.traceObjectId, "createNewStreamSegment", traceEnterWithContext, new Object[]{str});
            });
        }
        return createSegmentInStorageWithRecovery;
    }

    public CompletableFuture<String> createNewTransactionStreamSegment(String str, UUID uuid, Collection<AttributeUpdate> collection, Duration duration) {
        SegmentMetadata streamSegmentMetadata;
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.traceObjectId, "createNewTransactionStreamSegment", new Object[]{str});
        Exceptions.checkArgument(StreamSegmentNameUtils.getParentStreamSegmentName(str) == null, "parentStreamSegmentName", "Cannot create a Transaction for a Transaction.", new Object[0]);
        TimeoutTimer timeoutTimer = new TimeoutTimer(duration);
        CompletableFuture<Void> completableFuture = null;
        long streamSegmentId = this.containerMetadata.getStreamSegmentId(str, true);
        if (isValidStreamSegmentId(streamSegmentId) && (streamSegmentMetadata = this.containerMetadata.getStreamSegmentMetadata(streamSegmentId)) != null) {
            completableFuture = validateParentSegmentEligibility(streamSegmentMetadata);
        }
        if (completableFuture == null) {
            completableFuture = this.storage.getStreamSegmentInfo(str, timeoutTimer.getRemaining()).thenCompose(this::validateParentSegmentEligibility);
        }
        String transactionNameFromId = StreamSegmentNameUtils.getTransactionNameFromId(str, uuid);
        return completableFuture.thenComposeAsync(r9 -> {
            return createSegmentInStorageWithRecovery(transactionNameFromId, collection, timeoutTimer);
        }, this.executor).thenApply((Function<? super U, ? extends U>) r15 -> {
            LoggerHelpers.traceLeave(log, this.traceObjectId, "createNewTransactionStreamSegment", traceEnterWithContext, new Object[]{str, transactionNameFromId});
            return transactionNameFromId;
        });
    }

    private CompletableFuture<Void> createSegmentInStorageWithRecovery(String str, Collection<AttributeUpdate> collection, TimeoutTimer timeoutTimer) {
        return FutureHelpers.exceptionallyCompose(this.storage.create(str, timeoutTimer.getRemaining()), th -> {
            return handleStorageCreateException(str, ExceptionHelpers.getRealException(th), timeoutTimer);
        }).thenComposeAsync(segmentProperties -> {
            return this.stateStore.put(str, getState(segmentProperties, collection), timeoutTimer.getRemaining()).thenRun(() -> {
                if (segmentProperties.getLength() > 0) {
                    throw new CompletionException((Throwable) new StreamSegmentExistsException(str));
                }
            });
        }, this.executor);
    }

    private CompletableFuture<SegmentProperties> handleStorageCreateException(String str, Throwable th, TimeoutTimer timeoutTimer) {
        return !(th instanceof StreamSegmentExistsException) ? FutureHelpers.failedFuture(th) : this.stateStore.get(str, timeoutTimer.getRemaining()).exceptionally(th2 -> {
            Throwable realException = ExceptionHelpers.getRealException(th2);
            if (!(realException instanceof StreamSegmentNotExistsException) && !(realException instanceof DataCorruptionException)) {
                throw new CompletionException(realException);
            }
            log.warn("{}: Missing or corrupt State File for existing Segment '{}'; recreating.", new Object[]{this.traceObjectId, str, realException});
            return null;
        }).thenComposeAsync(segmentState -> {
            return segmentState == null ? this.storage.getStreamSegmentInfo(str, timeoutTimer.getRemaining()) : FutureHelpers.failedFuture(th);
        }, this.executor);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> CompletableFuture<T> getOrAssignStreamSegmentId(String str, Duration duration, Function<Long, CompletableFuture<T>> function) {
        QueuedCallback queuedCallback;
        Preconditions.checkNotNull(function, "thenCompose");
        long streamSegmentId = this.containerMetadata.getStreamSegmentId(str, true);
        if (isValidStreamSegmentId(streamSegmentId)) {
            if (this.containerMetadata.getStreamSegmentMetadata(streamSegmentId).isDeleted()) {
                return FutureHelpers.failedFuture(new StreamSegmentNotExistsException(str));
            }
            QueuedCallback queuedCallback2 = null;
            synchronized (this.assignmentLock) {
                PendingRequest orDefault = this.pendingRequests.getOrDefault(str, null);
                if (orDefault != null) {
                    queuedCallback2 = new QueuedCallback(function);
                    orDefault.callbacks.add(queuedCallback2);
                }
            }
            return queuedCallback2 == null ? function.apply(Long.valueOf(streamSegmentId)) : queuedCallback2.result;
        }
        boolean z = false;
        synchronized (this.assignmentLock) {
            PendingRequest orDefault2 = this.pendingRequests.getOrDefault(str, null);
            if (orDefault2 == null) {
                z = true;
                orDefault2 = new PendingRequest();
                this.pendingRequests.put(str, orDefault2);
            }
            queuedCallback = new QueuedCallback(function);
            orDefault2.callbacks.add(queuedCallback);
        }
        if (z) {
            String parentStreamSegmentName = StreamSegmentNameUtils.getParentStreamSegmentName(str);
            if (parentStreamSegmentName == null) {
                this.executor.execute(() -> {
                    assignStreamSegmentId(str, duration);
                });
            } else {
                this.executor.execute(() -> {
                    assignTransactionStreamSegmentId(str, parentStreamSegmentName, duration);
                });
            }
        }
        return queuedCallback.result;
    }

    @VisibleForTesting
    public CompletableFuture<Long> getOrAssignStreamSegmentId(String str, Duration duration) {
        return getOrAssignStreamSegmentId(str, duration, (v0) -> {
            return CompletableFuture.completedFuture(v0);
        });
    }

    private CompletableFuture<Long> assignTransactionStreamSegmentId(String str, String str2, Duration duration) {
        TimeoutTimer timeoutTimer = new TimeoutTimer(duration);
        AtomicReference atomicReference = new AtomicReference();
        return withFailureHandler(getOrAssignStreamSegmentId(str2, timeoutTimer.getRemaining(), l -> {
            atomicReference.set(l);
            return this.storage.getStreamSegmentInfo(str, timeoutTimer.getRemaining());
        }).thenCompose(segmentProperties -> {
            return retrieveAttributes(segmentProperties, timeoutTimer.getRemaining());
        }).thenCompose(segmentInfo -> {
            return assignTransactionStreamSegmentId(segmentInfo, ((Long) atomicReference.get()).longValue(), timeoutTimer.getRemaining());
        }), str);
    }

    private CompletableFuture<Long> assignTransactionStreamSegmentId(SegmentInfo segmentInfo, long j, Duration duration) {
        if (!$assertionsDisabled && segmentInfo == null) {
            throw new AssertionError("transInfo is null");
        }
        if ($assertionsDisabled || j != Long.MIN_VALUE) {
            return submitToOperationLogWithRetry(segmentInfo, j, duration);
        }
        throw new AssertionError("parentStreamSegmentId is invalid.");
    }

    private void assignStreamSegmentId(String str, Duration duration) {
        TimeoutTimer timeoutTimer = new TimeoutTimer(duration);
        withFailureHandler(this.storage.getStreamSegmentInfo(str, timeoutTimer.getRemaining()).thenComposeAsync(segmentProperties -> {
            return retrieveAttributes(segmentProperties, timeoutTimer.getRemaining());
        }, this.executor).thenComposeAsync(segmentInfo -> {
            return submitToOperationLogWithRetry(segmentInfo, Long.MIN_VALUE, timeoutTimer.getRemaining());
        }, this.executor), str);
    }

    private SegmentState getState(SegmentProperties segmentProperties, Collection<AttributeUpdate> collection) {
        if (collection == null) {
            return new SegmentState(Long.MIN_VALUE, segmentProperties);
        }
        HashMap hashMap = new HashMap(segmentProperties.getAttributes());
        collection.forEach(attributeUpdate -> {
        });
        return new SegmentState(Long.MIN_VALUE, new StreamSegmentInformation(segmentProperties, hashMap));
    }

    private CompletableFuture<SegmentInfo> retrieveAttributes(SegmentProperties segmentProperties, Duration duration) {
        return this.stateStore.get(segmentProperties.getName(), duration).thenApply(segmentState -> {
            if (segmentState == null) {
                return new SegmentInfo(Long.MIN_VALUE, segmentProperties);
            }
            if (segmentProperties.getName().equals(segmentState.getSegmentName())) {
                return new SegmentInfo(segmentState.getSegmentId(), new StreamSegmentInformation(segmentProperties, segmentState.getAttributes()));
            }
            throw new CompletionException((Throwable) new DataCorruptionException(String.format("Stored State for segment '%s' is corrupted. It refers to a different segment '%s'.", segmentProperties.getName(), segmentState.getSegmentName())));
        });
    }

    private CompletableFuture<Long> submitToOperationLogWithRetry(SegmentInfo segmentInfo, long j, Duration duration) {
        return retryWithCleanup(() -> {
            return submitToOperationLog(segmentInfo, j, duration);
        });
    }

    private CompletableFuture<Long> submitToOperationLog(SegmentInfo segmentInfo, long j, Duration duration) {
        StreamSegmentMapping applySegmentId;
        CompletableFuture<Void> add;
        SegmentProperties properties = segmentInfo.getProperties();
        if (properties.isDeleted()) {
            failAssignment(properties.getName(), new StreamSegmentNotExistsException("StreamSegment does not exist."));
            return FutureHelpers.failedFuture(new StreamSegmentNotExistsException(properties.getName()));
        }
        long streamSegmentId = this.containerMetadata.getStreamSegmentId(properties.getName(), true);
        if (isValidStreamSegmentId(streamSegmentId)) {
            completeAssignment(properties.getName(), streamSegmentId);
            return CompletableFuture.completedFuture(Long.valueOf(streamSegmentId));
        }
        if (isValidStreamSegmentId(j)) {
            SegmentMetadata streamSegmentMetadata = this.containerMetadata.getStreamSegmentMetadata(j);
            if (!$assertionsDisabled && streamSegmentMetadata == null) {
                throw new AssertionError("parentMetadata is null");
            }
            TransactionMapOperation transactionMapOperation = new TransactionMapOperation(j, properties);
            applySegmentId = applySegmentId(segmentInfo, transactionMapOperation);
            add = this.durableLog.add(transactionMapOperation, duration);
        } else {
            StreamSegmentMapOperation streamSegmentMapOperation = new StreamSegmentMapOperation(properties);
            applySegmentId = applySegmentId(segmentInfo, streamSegmentMapOperation);
            add = this.durableLog.add(streamSegmentMapOperation, duration);
        }
        StreamSegmentMapping streamSegmentMapping = applySegmentId;
        return add.thenApply(r8 -> {
            return Long.valueOf(completeAssignment(properties.getName(), streamSegmentMapping.getStreamSegmentId()));
        });
    }

    private StreamSegmentMapping applySegmentId(SegmentInfo segmentInfo, StreamSegmentMapping streamSegmentMapping) {
        if (segmentInfo.getSegmentId() != Long.MIN_VALUE) {
            streamSegmentMapping.setStreamSegmentId(segmentInfo.getSegmentId());
        }
        return streamSegmentMapping;
    }

    private long completeAssignment(String str, long j) {
        if (!$assertionsDisabled && j == Long.MIN_VALUE) {
            throw new AssertionError("no valid streamSegmentId given");
        }
        finishPendingRequests(str, (v0, v1) -> {
            v0.complete(v1);
        }, Long.valueOf(j));
        return j;
    }

    private void failAssignment(String str, Throwable th) {
        finishPendingRequests(str, (v0, v1) -> {
            v0.completeExceptionally(v1);
        }, th);
    }

    private <T> void finishPendingRequests(String str, BiConsumer<PendingRequest, T> biConsumer, T t) {
        PendingRequest remove;
        if (!$assertionsDisabled && str == null) {
            throw new AssertionError("no streamSegmentName given");
        }
        while (true) {
            synchronized (this.assignmentLock) {
                remove = this.pendingRequests.remove(str);
                if (remove == null || remove.callbacks.size() == 0) {
                    break;
                } else {
                    this.pendingRequests.put(str, new PendingRequest());
                }
            }
            biConsumer.accept(remove, t);
        }
    }

    private CompletableFuture<Long> withFailureHandler(CompletableFuture<Long> completableFuture, String str) {
        return completableFuture.exceptionally(th -> {
            failAssignment(str, th);
            throw new CompletionException(th);
        });
    }

    private CompletableFuture<Void> validateParentSegmentEligibility(SegmentProperties segmentProperties) {
        return (segmentProperties.isDeleted() || segmentProperties.isSealed()) ? FutureHelpers.failedFuture(new IllegalArgumentException("Cannot create a Transaction for a deleted or sealed Segment.")) : CompletableFuture.completedFuture(null);
    }

    private boolean isValidStreamSegmentId(long j) {
        return j != Long.MIN_VALUE;
    }

    private <T> CompletableFuture<T> retryWithCleanup(Supplier<CompletableFuture<T>> supplier) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        CompletableFuture<T> completableFuture2 = supplier.get();
        completableFuture.getClass();
        completableFuture2.thenAccept((Consumer) completableFuture::complete).exceptionally(th -> {
            try {
                if (ExceptionHelpers.getRealException(th) instanceof TooManyActiveSegmentsException) {
                    log.debug("{}: Forcing metadata cleanup due to capacity exceeded ({}).", this.traceObjectId, ExceptionHelpers.getRealException(th).getMessage());
                    CompletableFuture<U> thenComposeAsync = this.metadataCleanup.get().thenComposeAsync(r3 -> {
                        return (CompletableFuture) supplier.get();
                    }, this.executor);
                    completableFuture.getClass();
                    thenComposeAsync.thenAccept((Consumer<? super U>) completableFuture::complete);
                    completableFuture.getClass();
                    FutureHelpers.exceptionListener(thenComposeAsync, completableFuture::completeExceptionally);
                } else {
                    completableFuture.completeExceptionally(th);
                }
                return null;
            } catch (Throwable th) {
                completableFuture.completeExceptionally(th);
                throw th;
            }
        });
        return completableFuture;
    }

    static {
        $assertionsDisabled = !StreamSegmentMapper.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(StreamSegmentMapper.class);
    }
}
