package io.pravega.controller.server.bucket;

import io.pravega.client.stream.Stream;
import io.pravega.common.hash.RandomFactory;
import io.pravega.common.tracing.RequestTracker;
import io.pravega.common.tracing.TagLogger;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.controller.util.RetryHelper;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/bucket/PeriodicRetention.class */
public class PeriodicRetention {
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(PeriodicRetention.class));
    private final StreamMetadataTasks streamMetadataTasks;
    private final StreamMetadataStore streamMetadataStore;
    private final ScheduledExecutorService executor;
    private final RequestTracker requestTracker;
    private final Supplier<Long> requestIdGenerator;

    public PeriodicRetention(StreamMetadataStore streamMetadataStore, StreamMetadataTasks streamMetadataTasks, ScheduledExecutorService scheduledExecutorService, RequestTracker requestTracker) {
        Random create = RandomFactory.create();
        Objects.requireNonNull(create);
        this.requestIdGenerator = create::nextLong;
        this.streamMetadataTasks = streamMetadataTasks;
        this.streamMetadataStore = streamMetadataStore;
        this.executor = scheduledExecutorService;
        this.requestTracker = requestTracker;
    }

    public CompletableFuture<Void> retention(Stream stream) {
        long longValue = this.requestIdGenerator.get().longValue();
        String buildRequestDescriptor = RequestTracker.buildRequestDescriptor(new String[]{"truncateStream", stream.getScope(), stream.getStreamName()});
        this.requestTracker.trackRequest(buildRequestDescriptor, longValue);
        OperationContext createStreamContext = this.streamMetadataStore.createStreamContext(stream.getScope(), stream.getStreamName(), longValue);
        log.debug(longValue, "Periodic background processing for retention called for stream {}/{}", new Object[]{stream.getScope(), stream.getStreamName()});
        return RetryHelper.withRetriesAsync(() -> {
            return this.streamMetadataStore.getConfiguration(stream.getScope(), stream.getStreamName(), createStreamContext, this.executor).thenCompose(streamConfiguration -> {
                return this.streamMetadataTasks.retention(stream.getScope(), stream.getStreamName(), streamConfiguration.getRetentionPolicy(), System.currentTimeMillis(), createStreamContext, this.streamMetadataTasks.retrieveDelegationToken());
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                log.warn(longValue, "Exception thrown while performing auto retention for stream {} ", new Object[]{stream, th});
                throw new CompletionException(th);
            });
        }, RetryHelper.UNCONDITIONAL_PREDICATE, 5, this.executor).exceptionally(th -> {
            log.warn(longValue, "Unable to perform retention for stream {}. Ignoring, retention will be attempted in next cycle.", new Object[]{stream, th});
            return null;
        }).thenRun(() -> {
            this.requestTracker.untrackRequest(buildRequestDescriptor);
        });
    }
}
