package io.pravega.controller.server.retention;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AbstractService;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.impl.StreamImpl;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.controller.server.rest.generated.api.ApiResponseMessage;
import io.pravega.controller.server.retention.BucketChangeListener;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.controller.util.Config;
import io.pravega.controller.util.RetryHelper;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/retention/StreamCutBucketService.class */
public class StreamCutBucketService extends AbstractService implements BucketChangeListener {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(StreamCutBucketService.class);
    private final int bucketId;
    private final StreamMetadataStore streamMetadataStore;
    private final StreamMetadataTasks streamMetadataTasks;
    private final ScheduledExecutorService executor;
    private CompletableFuture<Void> notificationLoop;
    private final LinkedBlockingQueue<BucketChangeListener.StreamNotification> notifications = new LinkedBlockingQueue<>();
    private final ConcurrentMap<Stream, CompletableFuture<Void>> retentionFutureMap = new ConcurrentHashMap();
    private final CompletableFuture<Void> latch = new CompletableFuture<>();

    /* renamed from: io.pravega.controller.server.retention.StreamCutBucketService$1, reason: invalid class name */
    /* loaded from: input_file:io/pravega/controller/server/retention/StreamCutBucketService$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$pravega$controller$server$retention$BucketChangeListener$StreamNotification$NotificationType = new int[BucketChangeListener.StreamNotification.NotificationType.values().length];

        static {
            try {
                $SwitchMap$io$pravega$controller$server$retention$BucketChangeListener$StreamNotification$NotificationType[BucketChangeListener.StreamNotification.NotificationType.StreamAdded.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$pravega$controller$server$retention$BucketChangeListener$StreamNotification$NotificationType[BucketChangeListener.StreamNotification.NotificationType.StreamRemoved.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$pravega$controller$server$retention$BucketChangeListener$StreamNotification$NotificationType[BucketChangeListener.StreamNotification.NotificationType.StreamUpdated.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$pravega$controller$server$retention$BucketChangeListener$StreamNotification$NotificationType[BucketChangeListener.StreamNotification.NotificationType.ConnectivityError.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamCutBucketService(int i, StreamMetadataStore streamMetadataStore, StreamMetadataTasks streamMetadataTasks, ScheduledExecutorService scheduledExecutorService) {
        this.bucketId = i;
        this.streamMetadataStore = streamMetadataStore;
        this.streamMetadataTasks = streamMetadataTasks;
        this.executor = scheduledExecutorService;
    }

    protected void doStart() {
        RetryHelper.withIndefiniteRetriesAsync(() -> {
            return this.streamMetadataStore.getStreamsForBucket(this.bucketId, this.executor).thenAccept(list -> {
                this.retentionFutureMap.putAll((Map) list.stream().map(str -> {
                    String[] split = str.split("/");
                    log.info("Adding new stream {}/{} to bucket {} during bootstrap", new Object[]{split[0], split[1], Integer.valueOf(this.bucketId)});
                    return new StreamImpl(split[0], split[1]);
                }).collect(Collectors.toMap(streamImpl -> {
                    return streamImpl;
                }, this::getStreamRetentionFuture)));
            });
        }, th -> {
            log.warn("exception thrown getting streams for bucket {}, e = {}", Integer.valueOf(this.bucketId), th);
        }, this.executor).thenAccept(r5 -> {
            log.info("streams collected for the bucket {}, registering for change notification and starting loop for processing notifications", Integer.valueOf(this.bucketId));
            this.streamMetadataStore.registerBucketChangeListener(this.bucketId, this);
        }).whenComplete((r6, th2) -> {
            if (th2 != null) {
                notifyFailed(th2);
            } else {
                notifyStarted();
                this.notificationLoop = Futures.loop(this::isRunning, this::processNotification, this.executor);
            }
            this.latch.complete(null);
        });
    }

    private CompletableFuture<Void> processNotification() {
        return CompletableFuture.runAsync(() -> {
            BucketChangeListener.StreamNotification streamNotification = (BucketChangeListener.StreamNotification) Exceptions.handleInterrupted(() -> {
                return this.notifications.poll(1L, TimeUnit.SECONDS);
            });
            if (streamNotification != null) {
                switch (AnonymousClass1.$SwitchMap$io$pravega$controller$server$retention$BucketChangeListener$StreamNotification$NotificationType[streamNotification.getType().ordinal()]) {
                    case ApiResponseMessage.ERROR /* 1 */:
                        log.info("New stream {}/{} added to bucket {} ", new Object[]{streamNotification.getScope(), streamNotification.getStream(), Integer.valueOf(this.bucketId)});
                        Stream streamImpl = new StreamImpl(streamNotification.getScope(), streamNotification.getStream());
                        this.retentionFutureMap.computeIfAbsent(streamImpl, stream -> {
                            return getStreamRetentionFuture(streamImpl);
                        });
                        return;
                    case ApiResponseMessage.WARNING /* 2 */:
                        log.info("Stream {}/{} removed from bucket {} ", new Object[]{streamNotification.getScope(), streamNotification.getStream(), Integer.valueOf(this.bucketId)});
                        this.retentionFutureMap.remove(new StreamImpl(streamNotification.getScope(), streamNotification.getStream())).cancel(true);
                        return;
                    case ApiResponseMessage.INFO /* 3 */:
                    default:
                        return;
                    case ApiResponseMessage.OK /* 4 */:
                        log.info("Retention.StreamNotification for connectivity error");
                        return;
                }
            }
        }, this.executor);
    }

    private CompletableFuture<Void> getStreamRetentionFuture(StreamImpl streamImpl) {
        long millis = Duration.ofMinutes(Config.MINIMUM_RETENTION_FREQUENCY_IN_MINUTES).toMillis();
        return Futures.delayedFuture(() -> {
            return performRetention(streamImpl);
        }, ThreadLocalRandom.current().nextLong(millis), this.executor).thenCompose(r10 -> {
            return RetryHelper.loopWithDelay(this::isRunning, () -> {
                return performRetention(streamImpl);
            }, millis, this.executor);
        });
    }

    private CompletableFuture<Void> performRetention(StreamImpl streamImpl) {
        log.debug("Periodic background processing for retention called for stream {}/{}", streamImpl.getScope(), streamImpl.getStreamName());
        OperationContext createContext = this.streamMetadataStore.createContext(streamImpl.getScope(), streamImpl.getStreamName());
        return RetryHelper.withRetriesAsync(() -> {
            return this.streamMetadataStore.getConfiguration(streamImpl.getScope(), streamImpl.getStreamName(), createContext, this.executor).thenCompose(streamConfiguration -> {
                return this.streamMetadataTasks.retention(streamImpl.getScope(), streamImpl.getStreamName(), streamConfiguration.getRetentionPolicy(), System.currentTimeMillis(), createContext, this.streamMetadataTasks.retrieveDelegationToken());
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                log.warn("Exception thrown while performing auto retention for stream {} ", streamImpl, th);
                throw new CompletionException(th);
            });
        }, RetryHelper.UNCONDITIONAL_PREDICATE, 5, this.executor).exceptionally(th -> {
            log.warn("Unable to perform retention for stream {}. Ignoring, retention will be attempted in next cycle.", streamImpl, th);
            return null;
        });
    }

    protected void doStop() {
        Futures.await(this.latch);
        if (this.notificationLoop != null) {
            this.notificationLoop.thenAccept(r4 -> {
                this.retentionFutureMap.forEach((stream, completableFuture) -> {
                    completableFuture.cancel(true);
                });
                this.streamMetadataStore.unregisterBucketListener(this.bucketId);
            }).whenComplete((r42, th) -> {
                if (th != null) {
                    notifyFailed(th);
                } else {
                    notifyStopped();
                }
            });
        } else {
            notifyStopped();
        }
    }

    @Override // io.pravega.controller.server.retention.BucketChangeListener
    public void notify(BucketChangeListener.StreamNotification streamNotification) {
        this.notifications.add(streamNotification);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public int getBucketId() {
        return this.bucketId;
    }

    @VisibleForTesting
    Map<Stream, CompletableFuture<Void>> getRetentionFutureMap() {
        return Collections.unmodifiableMap(this.retentionFutureMap);
    }
}
