package io.pravega.controller.store.stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractService;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.controller.store.Version;
import io.pravega.controller.store.ZKStoreHelper;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.util.RetryHelper;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import lombok.Generated;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/store/stream/ZKGarbageCollector.class */
class ZKGarbageCollector extends AbstractService implements AutoCloseable {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ZKGarbageCollector.class);
    private static final String GC_ROOT = "/garbagecollection/%s";
    private static final String GUARD_PATH = "/garbagecollection/%s/guard";
    private final ZKStoreHelper zkStoreHelper;
    private final AtomicReference<CompletableFuture<Void>> gcLoop;
    private final CompletableFuture<Void> latch = new CompletableFuture<>();
    private final Supplier<CompletableFuture<Void>> gcProcessingSupplier;
    private final String gcName;
    private final String guardPath;
    private final AtomicReference<NodeCache> watch;
    private final ScheduledExecutorService gcExecutor;
    private final long periodInMillis;
    private final AtomicInteger currentBatch;
    private final AtomicInteger latestVersion;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ZKGarbageCollector(String str, ZKStoreHelper zKStoreHelper, Supplier<CompletableFuture<Void>> supplier, Duration duration) {
        Preconditions.checkNotNull(zKStoreHelper);
        Preconditions.checkNotNull(supplier);
        Preconditions.checkArgument((duration == null || duration.isNegative()) ? false : true);
        this.gcName = str;
        this.guardPath = String.format(GUARD_PATH, str);
        this.watch = new AtomicReference<>();
        this.zkStoreHelper = zKStoreHelper;
        this.gcProcessingSupplier = supplier;
        this.periodInMillis = duration.toMillis();
        this.gcExecutor = Executors.newSingleThreadScheduledExecutor();
        this.gcLoop = new AtomicReference<>();
        this.currentBatch = new AtomicInteger(0);
        this.latestVersion = new AtomicInteger(0);
    }

    protected void doStart() {
        RetryHelper.withRetriesAsync(() -> {
            return this.zkStoreHelper.createZNodeIfNotExist(this.guardPath).thenCompose(num -> {
                return fetchVersion().thenAccept(r4 -> {
                    this.currentBatch.set(this.latestVersion.get());
                });
            }).thenAccept((Consumer<? super U>) r6 -> {
                this.watch.compareAndSet(null, registerWatch(this.guardPath));
            });
        }, RetryHelper.RETRYABLE_PREDICATE, 5, this.gcExecutor).whenComplete((r6, th) -> {
            if (th == null) {
                notifyStarted();
                this.gcLoop.set(Futures.loop(this::isRunning, () -> {
                    return Futures.delayedFuture(this::process, this.periodInMillis, this.gcExecutor);
                }, this.gcExecutor));
            } else {
                notifyFailed(th);
            }
            this.latch.complete(null);
        });
    }

    protected void doStop() {
        this.latch.thenAccept(r4 -> {
            if (this.gcLoop.updateAndGet(completableFuture -> {
                if (completableFuture != null) {
                    completableFuture.cancel(true);
                    completableFuture.whenComplete((r6, th) -> {
                        if (th == null || (Exceptions.unwrap(th) instanceof CancellationException)) {
                            notifyStopped();
                        } else {
                            log.error("Exception while trying to stop GC {}", this.gcName, th);
                            notifyFailed(th);
                        }
                    });
                }
                return completableFuture;
            }) == null) {
                notifyStopped();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getLatestBatch() {
        return this.currentBatch.get();
    }

    @VisibleForTesting
    CompletableFuture<Void> process() {
        return this.zkStoreHelper.setData(this.guardPath, new byte[0], new Version.IntVersion(this.latestVersion.get())).thenComposeAsync(num -> {
            log.info("Acquired guard, starting GC iteration for {}", this.gcName);
            return this.gcProcessingSupplier.get();
        }, (Executor) this.gcExecutor).exceptionally((Function<Throwable, ? extends U>) th -> {
            Throwable unwrap = Exceptions.unwrap(th);
            if (unwrap instanceof StoreException.WriteConflictException) {
                log.debug("Unable to acquire guard. Will try in next cycle.");
                return null;
            }
            if (unwrap instanceof StoreException.StoreConnectionException) {
                log.info("StoreConnectionException thrown during Garbage Collection iteration for {}.", this.gcName);
                return null;
            }
            log.warn("Exception thrown during Garbage Collection iteration for {}. Log and ignore.", this.gcName, unwrap);
            return null;
        }).thenCompose(r3 -> {
            return fetchVersion();
        });
    }

    @VisibleForTesting
    CompletableFuture<Void> fetchVersion() {
        return this.zkStoreHelper.getData(this.guardPath, bArr -> {
            return bArr;
        }).thenAccept(versionedMetadata -> {
            this.latestVersion.set(versionedMetadata.getVersion().asIntVersion().getIntValue());
        });
    }

    @VisibleForTesting
    void setVersion(int i) {
        this.latestVersion.set(i);
    }

    @VisibleForTesting
    int getVersion() {
        return this.latestVersion.get();
    }

    private NodeCache registerWatch(String str) {
        try {
            NodeCache nodeCache = new NodeCache(this.zkStoreHelper.getClient(), str);
            nodeCache.getListenable().addListener(() -> {
                this.currentBatch.set(nodeCache.getCurrentData().getStat().getVersion());
                log.debug("Current batch for {} changed to {}", this.gcName, Integer.valueOf(this.currentBatch.get()));
            });
            nodeCache.start();
            return nodeCache;
        } catch (Exception e) {
            throw e;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.watch.getAndUpdate(nodeCache -> {
            if (nodeCache != null) {
                try {
                    nodeCache.close();
                } catch (IOException e) {
                    throw Exceptions.sneakyThrow(e);
                }
            }
            return nodeCache;
        });
        this.gcExecutor.shutdown();
    }
}
