package io.pravega.segmentstore.server.writer;

import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.AbstractTimer;
import io.pravega.common.Exceptions;
import io.pravega.common.TimeoutTimer;
import io.pravega.common.concurrent.Futures;
import io.pravega.segmentstore.contracts.AttributeUpdate;
import io.pravega.segmentstore.contracts.Attributes;
import io.pravega.segmentstore.contracts.StreamSegmentMergedException;
import io.pravega.segmentstore.contracts.StreamSegmentNotExistsException;
import io.pravega.segmentstore.contracts.StreamSegmentSealedException;
import io.pravega.segmentstore.server.DataCorruptionException;
import io.pravega.segmentstore.server.SegmentOperation;
import io.pravega.segmentstore.server.UpdateableSegmentMetadata;
import io.pravega.segmentstore.server.WriterFlushResult;
import io.pravega.segmentstore.server.WriterSegmentProcessor;
import io.pravega.segmentstore.server.logs.operations.AttributeUpdaterOperation;
import io.pravega.segmentstore.server.logs.operations.StreamSegmentSealOperation;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.ThreadSafe;
import lombok.Generated;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/segmentstore/server/writer/AttributeAggregator.class */
class AttributeAggregator implements WriterSegmentProcessor, AutoCloseable {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(AttributeAggregator.class);
    private final UpdateableSegmentMetadata metadata;
    private final WriterConfig config;
    private final AbstractTimer timer;
    private final Executor executor;
    private final String traceObjectId;
    private final WriterDataSource dataSource;
    private final AtomicReference<Duration> lastFlush;
    private final State state;
    private final AtomicBoolean closed;
    private final AtomicReference<RootPointerInfo> lastRootPointer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/segmentstore/server/writer/AttributeAggregator$RootPointerInfo.class */
    public static class RootPointerInfo {
        private final long rootPointer;
        private final long lastSequenceNumber;

        public String toString() {
            return String.format("RootPointer=%s, LastSeqNo=%s", Long.valueOf(this.rootPointer), Long.valueOf(this.lastSequenceNumber));
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        @ConstructorProperties({"rootPointer", "lastSequenceNumber"})
        public RootPointerInfo(long j, long j2) {
            this.rootPointer = j;
            this.lastSequenceNumber = j2;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public long getRootPointer() {
            return this.rootPointer;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public long getLastSequenceNumber() {
            return this.lastSequenceNumber;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof RootPointerInfo)) {
                return false;
            }
            RootPointerInfo rootPointerInfo = (RootPointerInfo) obj;
            return rootPointerInfo.canEqual(this) && getRootPointer() == rootPointerInfo.getRootPointer() && getLastSequenceNumber() == rootPointerInfo.getLastSequenceNumber();
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        protected boolean canEqual(Object obj) {
            return obj instanceof RootPointerInfo;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public int hashCode() {
            long rootPointer = getRootPointer();
            int i = (1 * 59) + ((int) ((rootPointer >>> 32) ^ rootPointer));
            long lastSequenceNumber = getLastSequenceNumber();
            return (i * 59) + ((int) ((lastSequenceNumber >>> 32) ^ lastSequenceNumber));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @ThreadSafe
    /* loaded from: input_file:io/pravega/segmentstore/server/writer/AttributeAggregator$State.class */
    public static class State {
        private final AtomicLong lastPersistedSequenceNumber;
        private final Map<UUID, Long> attributes = Collections.synchronizedMap(new HashMap());
        private final AtomicLong firstSequenceNumber = new AtomicLong(Long.MIN_VALUE);
        private final AtomicLong lastSequenceNumber = new AtomicLong(Long.MIN_VALUE);
        private final AtomicBoolean sealed = new AtomicBoolean(false);

        State(long j) {
            this.lastPersistedSequenceNumber = new AtomicLong(j);
        }

        void seal() {
            this.sealed.set(true);
        }

        boolean hasSeal() {
            return this.sealed.get();
        }

        boolean include(AttributeUpdaterOperation attributeUpdaterOperation) {
            Preconditions.checkState(!this.sealed.get(), "Cannot accept more operations after sealing.");
            if (attributeUpdaterOperation.getSequenceNumber() <= this.lastPersistedSequenceNumber.get()) {
                return false;
            }
            boolean z = false;
            if (attributeUpdaterOperation.getAttributeUpdates() != null) {
                for (AttributeUpdate attributeUpdate : attributeUpdaterOperation.getAttributeUpdates()) {
                    if (!Attributes.isCoreAttribute(attributeUpdate.getAttributeId())) {
                        this.attributes.put(attributeUpdate.getAttributeId(), Long.valueOf(attributeUpdate.getValue()));
                        z = true;
                    }
                }
            }
            if (z) {
                this.firstSequenceNumber.compareAndSet(Long.MIN_VALUE, attributeUpdaterOperation.getSequenceNumber());
                this.lastPersistedSequenceNumber.compareAndSet(Long.MIN_VALUE, attributeUpdaterOperation.getSequenceNumber() - 1);
                this.lastSequenceNumber.set(attributeUpdaterOperation.getSequenceNumber());
            }
            return z;
        }

        long getFirstSequenceNumber() {
            return this.firstSequenceNumber.get();
        }

        long getLastSequenceNumber() {
            return this.lastSequenceNumber.get();
        }

        void setLastPersistedSequenceNumber(long j) {
            this.lastPersistedSequenceNumber.set(j);
        }

        long getLastPersistedSequenceNumber() {
            return this.lastPersistedSequenceNumber.get();
        }

        int size() {
            return this.attributes.size();
        }

        Map<UUID, Long> getAttributes() {
            return this.attributes;
        }

        void acceptChanges() {
            this.attributes.clear();
            this.firstSequenceNumber.set(Long.MIN_VALUE);
            this.lastSequenceNumber.set(Long.MIN_VALUE);
            this.sealed.set(false);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AttributeAggregator(@NonNull UpdateableSegmentMetadata updateableSegmentMetadata, @NonNull WriterDataSource writerDataSource, @NonNull WriterConfig writerConfig, @NonNull AbstractTimer abstractTimer, @NonNull Executor executor) {
        if (updateableSegmentMetadata == null) {
            throw new NullPointerException("segmentMetadata is marked non-null but is null");
        }
        if (writerDataSource == null) {
            throw new NullPointerException("dataSource is marked non-null but is null");
        }
        if (writerConfig == null) {
            throw new NullPointerException("config is marked non-null but is null");
        }
        if (abstractTimer == null) {
            throw new NullPointerException("timer is marked non-null but is null");
        }
        if (executor == null) {
            throw new NullPointerException("executor is marked non-null but is null");
        }
        this.metadata = updateableSegmentMetadata;
        this.config = writerConfig;
        this.dataSource = writerDataSource;
        this.timer = abstractTimer;
        this.executor = executor;
        this.lastFlush = new AtomicReference<>(abstractTimer.getElapsed());
        Preconditions.checkArgument(this.metadata.getContainerId() == writerDataSource.getId(), "SegmentMetadata.ContainerId is different from WriterDataSource.Id");
        this.traceObjectId = String.format("AttributeAggregator[%d-%d]", Integer.valueOf(this.metadata.getContainerId()), Long.valueOf(this.metadata.getId()));
        this.state = new State(updateableSegmentMetadata.getAttributes().getOrDefault(Attributes.ATTRIBUTE_SEGMENT_PERSIST_SEQ_NO, Long.MIN_VALUE).longValue());
        this.closed = new AtomicBoolean();
        this.lastRootPointer = new AtomicReference<>();
    }

    @Override // io.pravega.segmentstore.server.WriterSegmentProcessor, java.lang.AutoCloseable
    public void close() {
        this.closed.set(true);
    }

    @Override // io.pravega.segmentstore.server.WriterSegmentProcessor
    public long getLowestUncommittedSequenceNumber() {
        if (this.lastRootPointer.get() == null) {
            return this.state.getFirstSequenceNumber();
        }
        long lastPersistedSequenceNumber = this.state.getLastPersistedSequenceNumber();
        return lastPersistedSequenceNumber == Long.MIN_VALUE ? this.state.getFirstSequenceNumber() : lastPersistedSequenceNumber + 1;
    }

    @Override // io.pravega.segmentstore.server.WriterSegmentProcessor
    public boolean isClosed() {
        return this.closed.get();
    }

    public String toString() {
        return String.format("[%d: %s] Count = %d, LUSN = %d, LastSeqNo = %d, LastFlush = %ds", Long.valueOf(this.metadata.getId()), this.metadata.getName(), Integer.valueOf(this.state.size()), Long.valueOf(getLowestUncommittedSequenceNumber()), Long.valueOf(this.state.getLastSequenceNumber()), Long.valueOf(getElapsedSinceLastFlush().toMillis() / 1000));
    }

    @Override // io.pravega.segmentstore.server.WriterSegmentProcessor
    public void add(SegmentOperation segmentOperation) throws DataCorruptionException {
        Exceptions.checkNotClosed(isClosed(), this);
        Preconditions.checkArgument(segmentOperation.getStreamSegmentId() == this.metadata.getId(), "Operation '%s' refers to a different Segment than this one (%s).", segmentOperation, this.metadata.getId());
        if (isSegmentDeleted()) {
            return;
        }
        boolean z = false;
        if (segmentOperation instanceof StreamSegmentSealOperation) {
            this.state.seal();
            z = true;
        } else if (segmentOperation instanceof AttributeUpdaterOperation) {
            AttributeUpdaterOperation attributeUpdaterOperation = (AttributeUpdaterOperation) segmentOperation;
            if (this.state.hasSeal()) {
                if (!attributeUpdaterOperation.isInternal() || !attributeUpdaterOperation.hasOnlyCoreAttributes()) {
                    throw new DataCorruptionException(String.format("Illegal operation for a sealed Segment; received '%s'.", segmentOperation), new Object[0]);
                }
                log.debug("{}: Ignored internal operation on sealed segment {}.", this.traceObjectId, segmentOperation);
                return;
            }
            z = this.state.include(attributeUpdaterOperation);
        }
        if (z) {
            log.debug("{}: Add {}; OpCount={}.", new Object[]{this.traceObjectId, segmentOperation, Integer.valueOf(this.state.size())});
        }
    }

    @Override // io.pravega.segmentstore.server.WriterSegmentProcessor
    public boolean mustFlush() {
        if (isSegmentDeleted()) {
            return false;
        }
        return this.state.hasSeal() || this.state.size() >= this.config.getFlushAttributesThreshold() || (this.state.size() > 0 && getElapsedSinceLastFlush().compareTo(this.config.getFlushThresholdTime()) >= 0);
    }

    @Override // io.pravega.segmentstore.server.WriterSegmentProcessor
    public CompletableFuture<WriterFlushResult> flush(boolean z, Duration duration) {
        Exceptions.checkNotClosed(isClosed(), this);
        if (!z && !mustFlush()) {
            return CompletableFuture.completedFuture(new WriterFlushResult());
        }
        TimeoutTimer timeoutTimer = new TimeoutTimer(duration);
        CompletableFuture handleAttributeException = handleAttributeException(persistPendingAttributes(this.state.getAttributes(), this.state.getLastSequenceNumber(), timeoutTimer));
        if (this.state.hasSeal()) {
            handleAttributeException = handleAttributeException.thenComposeAsync(r6 -> {
                return handleAttributeException(sealAttributes(timeoutTimer));
            });
        }
        return handleAttributeException.thenApply(r10 -> {
            if (this.state.size() > 0) {
                log.debug("{}: Flushed. Count={}, SeqNo={}-{}, Forced={}.", new Object[]{this.traceObjectId, Integer.valueOf(this.state.size()), Long.valueOf(this.state.getFirstSequenceNumber()), Long.valueOf(this.state.getLastSequenceNumber()), Boolean.valueOf(z)});
            }
            WriterFlushResult writerFlushResult = new WriterFlushResult();
            writerFlushResult.withFlushedAttributes(this.state.size());
            this.state.acceptChanges();
            this.lastFlush.set(this.timer.getElapsed());
            return writerFlushResult;
        });
    }

    private CompletableFuture<Void> persistPendingAttributes(Map<UUID, Long> map, long j, TimeoutTimer timeoutTimer) {
        return map.isEmpty() ? CompletableFuture.completedFuture(null) : this.dataSource.persistAttributes(this.metadata.getId(), map, timeoutTimer.getRemaining()).thenAcceptAsync(l -> {
            queueRootPointerUpdate(l.longValue(), j);
        }, this.executor);
    }

    private CompletableFuture<Void> sealAttributes(TimeoutTimer timeoutTimer) {
        log.debug("{}: Sealing Attribute Index.", this.traceObjectId);
        return this.dataSource.sealAttributes(this.metadata.getId(), timeoutTimer.getRemaining());
    }

    private void queueRootPointerUpdate(long j, long j2) {
        if (this.lastRootPointer.getAndSet(new RootPointerInfo(j, j2)) == null) {
            AtomicBoolean atomicBoolean = new AtomicBoolean(this.lastRootPointer.get() != null);
            Objects.requireNonNull(atomicBoolean);
            Futures.loop(atomicBoolean::get, () -> {
                RootPointerInfo rootPointerInfo = this.lastRootPointer.get();
                log.debug("{}: Updating Root Pointer info to {}.", this.traceObjectId, rootPointerInfo);
                return this.dataSource.notifyAttributesPersisted(this.metadata.getId(), this.metadata.getType(), rootPointerInfo.getRootPointer(), rootPointerInfo.getLastSequenceNumber(), this.config.getFlushTimeout()).whenCompleteAsync((r7, th) -> {
                    if (th != null) {
                        logAttributesPersistedError(th, rootPointerInfo);
                    } else {
                        this.state.setLastPersistedSequenceNumber(rootPointerInfo.getLastSequenceNumber());
                    }
                    if (this.lastRootPointer.compareAndSet(rootPointerInfo, null)) {
                        atomicBoolean.set(false);
                    }
                }, this.executor);
            }, this.executor);
        }
    }

    private void logAttributesPersistedError(Throwable th, RootPointerInfo rootPointerInfo) {
        Throwable unwrap = Exceptions.unwrap(th);
        if ((unwrap instanceof StreamSegmentMergedException) || (unwrap instanceof StreamSegmentNotExistsException)) {
            log.info("{}: Unable to persist root pointer {} due to segment being merged or deleted.", this.traceObjectId, rootPointerInfo);
        } else {
            log.error("{}: Unable to persist root pointer {}.", new Object[]{this.traceObjectId, rootPointerInfo, unwrap});
        }
    }

    private <T> CompletableFuture<T> handleAttributeException(CompletableFuture<T> completableFuture) {
        return Futures.exceptionallyExpecting(completableFuture, th -> {
            return ((th instanceof StreamSegmentSealedException) && this.metadata.isSealed()) || (((th instanceof StreamSegmentNotExistsException) || (th instanceof StreamSegmentMergedException)) && (this.metadata.isMerged() || this.metadata.isDeleted()));
        }, (Object) null);
    }

    private boolean isSegmentDeleted() {
        return this.metadata.isDeleted() || this.metadata.isMerged();
    }

    private Duration getElapsedSinceLastFlush() {
        return this.timer.getElapsed().minus(this.lastFlush.get());
    }
}
