package io.pravega.segmentstore.storage.chunklayer;

import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.AbstractThreadPoolService;
import io.pravega.common.concurrent.ExecutorServiceHelpers;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.concurrent.Services;
import io.pravega.segmentstore.storage.metadata.ChunkMetadata;
import io.pravega.segmentstore.storage.metadata.ChunkMetadataStore;
import io.pravega.segmentstore.storage.metadata.MetadataTransaction;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/segmentstore/storage/chunklayer/GarbageCollector.class */
public class GarbageCollector extends AbstractThreadPoolService implements AutoCloseable {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(GarbageCollector.class);
    private static final Duration SHUTDOWN_TIMEOUT = Duration.ofSeconds(10);
    private final DelayQueue<GarbageChunkInfo> garbageChunks;
    private final ChunkStorage chunkStorage;
    private final ChunkMetadataStore metadataStore;
    private final ChunkedSegmentStorageConfig config;
    private final AtomicBoolean closed;
    private final AtomicBoolean suspended;
    private final AtomicInteger queueSize;
    private final AtomicLong iterationId;
    private CompletableFuture<Void> loopFuture;
    private final Supplier<Long> currentTimeSupplier;
    private final Supplier<CompletableFuture<Void>> delaySupplier;
    private final ScheduledExecutorService storageExecutor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/pravega/segmentstore/storage/chunklayer/GarbageCollector$GarbageChunkInfo.class */
    public class GarbageChunkInfo implements Delayed {
        private final String name;
        private final long scheduledDeleteTime;
        private final int attempts;

        @Override // java.util.concurrent.Delayed
        public long getDelay(TimeUnit timeUnit) {
            return timeUnit.convert(this.scheduledDeleteTime - GarbageCollector.this.currentTimeSupplier.get().longValue(), TimeUnit.MILLISECONDS);
        }

        @Override // java.lang.Comparable
        public int compareTo(Delayed delayed) {
            return Ints.saturatedCast(this.scheduledDeleteTime - ((GarbageChunkInfo) delayed).scheduledDeleteTime);
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        @ConstructorProperties({"name", "scheduledDeleteTime", "attempts"})
        public GarbageChunkInfo(String str, long j, int i) {
            this.name = str;
            this.scheduledDeleteTime = j;
            this.attempts = i;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public long getScheduledDeleteTime() {
            return this.scheduledDeleteTime;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public int getAttempts() {
            return this.attempts;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof GarbageChunkInfo)) {
                return false;
            }
            GarbageChunkInfo garbageChunkInfo = (GarbageChunkInfo) obj;
            if (!garbageChunkInfo.canEqual(this)) {
                return false;
            }
            String name = getName();
            String name2 = garbageChunkInfo.getName();
            if (name == null) {
                if (name2 != null) {
                    return false;
                }
            } else if (!name.equals(name2)) {
                return false;
            }
            return getScheduledDeleteTime() == garbageChunkInfo.getScheduledDeleteTime() && getAttempts() == garbageChunkInfo.getAttempts();
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        protected boolean canEqual(Object obj) {
            return obj instanceof GarbageChunkInfo;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public int hashCode() {
            String name = getName();
            int hashCode = (1 * 59) + (name == null ? 43 : name.hashCode());
            long scheduledDeleteTime = getScheduledDeleteTime();
            return (((hashCode * 59) + ((int) ((scheduledDeleteTime >>> 32) ^ scheduledDeleteTime))) * 59) + getAttempts();
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String toString() {
            String name = getName();
            long scheduledDeleteTime = getScheduledDeleteTime();
            getAttempts();
            return "GarbageCollector.GarbageChunkInfo(name=" + name + ", scheduledDeleteTime=" + scheduledDeleteTime + ", attempts=" + name + ")";
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String getName() {
            return this.name;
        }
    }

    public GarbageCollector(int i, ChunkStorage chunkStorage, ChunkMetadataStore chunkMetadataStore, ChunkedSegmentStorageConfig chunkedSegmentStorageConfig, ScheduledExecutorService scheduledExecutorService) {
        this(i, chunkStorage, chunkMetadataStore, chunkedSegmentStorageConfig, scheduledExecutorService, System::currentTimeMillis, () -> {
            return Futures.delayedFuture(chunkedSegmentStorageConfig.getGarbageCollectionSleep(), scheduledExecutorService);
        });
    }

    public GarbageCollector(int i, ChunkStorage chunkStorage, ChunkMetadataStore chunkMetadataStore, ChunkedSegmentStorageConfig chunkedSegmentStorageConfig, ScheduledExecutorService scheduledExecutorService, Supplier<Long> supplier, Supplier<CompletableFuture<Void>> supplier2) {
        super(String.format("GarbageCollector[%d]", Integer.valueOf(i)), ExecutorServiceHelpers.newScheduledThreadPool(1, "storage-gc"));
        this.garbageChunks = new DelayQueue<>();
        this.closed = new AtomicBoolean();
        this.suspended = new AtomicBoolean();
        this.queueSize = new AtomicInteger();
        this.iterationId = new AtomicLong();
        try {
            this.chunkStorage = (ChunkStorage) Preconditions.checkNotNull(chunkStorage, "chunkStorage");
            this.metadataStore = (ChunkMetadataStore) Preconditions.checkNotNull(chunkMetadataStore, "metadataStore");
            this.config = (ChunkedSegmentStorageConfig) Preconditions.checkNotNull(chunkedSegmentStorageConfig, "config");
            this.currentTimeSupplier = (Supplier) Preconditions.checkNotNull(supplier, "currentTimeSupplier");
            this.delaySupplier = (Supplier) Preconditions.checkNotNull(supplier2, "delaySupplier");
            this.storageExecutor = (ScheduledExecutorService) Preconditions.checkNotNull(scheduledExecutorService, "storageExecutor");
        } catch (Exception e) {
            this.executor.shutdownNow();
            throw e;
        }
    }

    public void initialize() {
        Services.startAsync(this, this.executor);
    }

    protected Duration getShutdownTimeout() {
        return SHUTDOWN_TIMEOUT;
    }

    protected CompletableFuture<Void> doRun() {
        this.loopFuture = Futures.loop(this::canRun, () -> {
            return this.delaySupplier.get().thenRunAsync(() -> {
                log.info("{}: Iteration {} started.", this.traceObjectId, Long.valueOf(this.iterationId.get()));
            }, (Executor) this.executor).thenComposeAsync(r5 -> {
                return deleteGarbage(true, this.config.getGarbageCollectionMaxConcurrency());
            }, (Executor) this.executor).handleAsync((BiFunction<? super U, Throwable, ? extends U>) (bool, th) -> {
                if (null != th) {
                    log.error("{}: Error during doRun.", this.traceObjectId, th);
                }
                log.info("{}: Iteration {} ended.", this.traceObjectId, Long.valueOf(this.iterationId.getAndIncrement()));
                return null;
            }, (Executor) this.executor);
        }, this.executor);
        return this.loopFuture;
    }

    private boolean canRun() {
        return isRunning() && getStopException() == null && !this.closed.get();
    }

    void setSuspended(boolean z) {
        this.suspended.set(z);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addToGarbage(Collection<String> collection) {
        Long l = this.currentTimeSupplier.get();
        collection.forEach(str -> {
            addToGarbage(str, l.longValue() + this.config.getGarbageCollectionDelay().toMillis(), 0);
        });
        if (this.queueSize.get() >= this.config.getGarbageCollectionMaxQueueSize()) {
            log.warn("{}: deleteGarbage - Queue full. Could not delete garbage. Chunks skipped", this.traceObjectId);
        }
    }

    void addToGarbage(String str, long j, int i) {
        if (this.queueSize.get() >= this.config.getGarbageCollectionMaxQueueSize()) {
            log.debug("{}: deleteGarbage - Queue full. Could not delete garbage. chunk {}.", this.traceObjectId, str);
        } else {
            this.garbageChunks.add((DelayQueue<GarbageChunkInfo>) new GarbageChunkInfo(str, j, i));
            this.queueSize.incrementAndGet();
        }
    }

    CompletableFuture<Boolean> deleteGarbage(boolean z, int i) {
        log.debug("{}: deleteGarbage - started.", this.traceObjectId);
        if (this.suspended.get() && z) {
            log.info("{}: deleteGarbage - suspended - sleeping for {}.", this.traceObjectId, this.config.getGarbageCollectionDelay());
            return CompletableFuture.completedFuture(false);
        }
        ArrayList arrayList = new ArrayList();
        int i2 = 0;
        try {
            GarbageChunkInfo take = this.garbageChunks.take();
            log.trace("{}: deleteGarbage - retrieved {}", this.traceObjectId, take);
            while (null != take) {
                this.queueSize.decrementAndGet();
                arrayList.add(take);
                i2++;
                if (i2 >= i) {
                    break;
                }
                take = this.garbageChunks.poll();
                log.trace("{}: deleteGarbage - retrieved {}", this.traceObjectId, take);
            }
            if (i2 == 0) {
                log.debug("{}: deleteGarbage - no work - sleeping for {}.", this.traceObjectId, this.config.getGarbageCollectionDelay());
                return CompletableFuture.completedFuture(false);
            }
            ArrayList arrayList2 = new ArrayList();
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                GarbageChunkInfo garbageChunkInfo = (GarbageChunkInfo) it.next();
                String str = garbageChunkInfo.name;
                AtomicBoolean atomicBoolean = new AtomicBoolean();
                MetadataTransaction beginTransaction = this.metadataStore.beginTransaction(false, str);
                arrayList2.add(beginTransaction.get(garbageChunkInfo.name).thenComposeAsync(storageMetadata -> {
                    ChunkMetadata chunkMetadata = (ChunkMetadata) storageMetadata;
                    boolean z2 = null == chunkMetadata || !chunkMetadata.isActive();
                    AtomicBoolean atomicBoolean2 = new AtomicBoolean((null == storageMetadata || chunkMetadata.isActive()) ? false : true);
                    if (!z2) {
                        log.info("{}: deleteGarbage - Chunk is not marked as garbage chunk={}.", this.traceObjectId, str);
                        return CompletableFuture.completedFuture(null);
                    }
                    CompletableFuture<ChunkHandle> openWrite = this.chunkStorage.openWrite(str);
                    ChunkStorage chunkStorage = this.chunkStorage;
                    Objects.requireNonNull(chunkStorage);
                    return openWrite.thenComposeAsync(chunkStorage::delete, (Executor) this.storageExecutor).handleAsync((BiFunction<? super U, Throwable, ? extends U>) (r9, th) -> {
                        if (th == null) {
                            log.debug("{}: deleteGarbage - deleted chunk={}.", this.traceObjectId, str);
                        } else if (Exceptions.unwrap(th) instanceof ChunkNotFoundException) {
                            log.debug("{}: deleteGarbage - Could not delete garbage chunk={}.", this.traceObjectId, str);
                        } else {
                            log.warn("{}: deleteGarbage - Could not delete garbage chunk={}.", this.traceObjectId, str);
                            atomicBoolean2.set(false);
                            atomicBoolean.set(true);
                        }
                        return r9;
                    }, (Executor) this.storageExecutor).thenRunAsync(() -> {
                        if (atomicBoolean2.get()) {
                            beginTransaction.delete(str);
                            log.debug("{}: deleteGarbage - deleted metadata for chunk={}.", this.traceObjectId, str);
                        }
                    }, (Executor) this.storageExecutor).thenComposeAsync(r3 -> {
                        return beginTransaction.commit();
                    }, (Executor) this.storageExecutor).handleAsync((BiFunction<? super U, Throwable, ? extends U>) (r10, th2) -> {
                        if (th2 != null) {
                            log.error(String.format("%s deleteGarbage - Could not delete metadata for garbage chunk=%s.", this.traceObjectId, str), th2);
                            atomicBoolean.set(true);
                        }
                        return r10;
                    }, (Executor) this.storageExecutor);
                }, (Executor) this.storageExecutor).whenCompleteAsync((BiConsumer<? super U, ? super Throwable>) (r12, th) -> {
                    if (atomicBoolean.get()) {
                        if (garbageChunkInfo.getAttempts() < this.config.getGarbageCollectionMaxAttempts()) {
                            log.debug("{}: deleteGarbage - adding back chunk={}.", this.traceObjectId, str);
                            addToGarbage(str, garbageChunkInfo.getScheduledDeleteTime() + this.config.getGarbageCollectionDelay().toMillis(), garbageChunkInfo.getAttempts() + 1);
                        } else {
                            log.info("{}: deleteGarbage - could not delete after max attempts chunk={}.", this.traceObjectId, str);
                        }
                    }
                    if (th != null) {
                        log.error(String.format("%s deleteGarbage - Could not find garbage chunk=%s.", this.traceObjectId, str), th);
                    }
                    beginTransaction.close();
                }, (Executor) this.executor));
            }
            return Futures.allOf(arrayList2).thenApplyAsync(r5 -> {
                log.debug("{}: deleteGarbage - finished.", this.traceObjectId);
                return true;
            }, (Executor) this.executor);
        } catch (InterruptedException e) {
            throw new CompletionException(e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        Services.stopAsync(this, this.executor);
        if (this.closed.get()) {
            return;
        }
        if (null != this.loopFuture) {
            this.loopFuture.cancel(true);
        }
        this.closed.set(true);
        super.close();
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public DelayQueue<GarbageChunkInfo> getGarbageChunks() {
        return this.garbageChunks;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public AtomicInteger getQueueSize() {
        return this.queueSize;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public AtomicLong getIterationId() {
        return this.iterationId;
    }
}
