package io.pravega.controller.server.eventProcessor.requesthandlers;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.Stream;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.tracing.TagLogger;
import io.pravega.controller.retryable.RetryableException;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.records.ReaderGroupConfigRecord;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.controller.util.RetryHelper;
import io.pravega.shared.NameUtils;
import io.pravega.shared.controller.event.UpdateReaderGroupEvent;
import java.util.Iterator;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/eventProcessor/requesthandlers/UpdateReaderGroupTask.class */
public class UpdateReaderGroupTask implements ReaderGroupTask<UpdateReaderGroupEvent> {
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(UpdateReaderGroupTask.class));
    private final StreamMetadataStore streamMetadataStore;
    private final StreamMetadataTasks streamMetadataTasks;
    private final ScheduledExecutorService executor;

    public UpdateReaderGroupTask(StreamMetadataTasks streamMetadataTasks, StreamMetadataStore streamMetadataStore, ScheduledExecutorService scheduledExecutorService) {
        Preconditions.checkNotNull(streamMetadataStore);
        Preconditions.checkNotNull(streamMetadataTasks);
        Preconditions.checkNotNull(scheduledExecutorService);
        this.streamMetadataStore = streamMetadataStore;
        this.streamMetadataTasks = streamMetadataTasks;
        this.executor = scheduledExecutorService;
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.ReaderGroupTask
    public CompletableFuture<Void> execute(UpdateReaderGroupEvent updateReaderGroupEvent) {
        String scope = updateReaderGroupEvent.getScope();
        String rgName = updateReaderGroupEvent.getRgName();
        long requestId = updateReaderGroupEvent.getRequestId();
        long generation = updateReaderGroupEvent.getGeneration();
        UUID readerGroupId = updateReaderGroupEvent.getReaderGroupId();
        boolean isTransitionToFromSubscriber = updateReaderGroupEvent.isTransitionToFromSubscriber();
        ImmutableSet removeStreams = updateReaderGroupEvent.getRemoveStreams();
        OperationContext createRGContext = this.streamMetadataStore.createRGContext(scope, rgName, requestId);
        return RetryHelper.withRetriesAsync(() -> {
            return this.streamMetadataStore.getReaderGroupId(scope, rgName, createRGContext, this.executor).thenCompose(uuid -> {
                if (uuid.equals(readerGroupId)) {
                    return this.streamMetadataStore.getReaderGroupConfigRecord(scope, rgName, createRGContext, this.executor).thenCompose(versionedMetadata -> {
                        if (((ReaderGroupConfigRecord) versionedMetadata.getObject()).getGeneration() != generation) {
                            log.warn(requestId, "Skipping processing of Reader Group update request as generation did not match.", new Object[0]);
                            return CompletableFuture.completedFuture(null);
                        }
                        if (!((ReaderGroupConfigRecord) versionedMetadata.getObject()).isUpdating()) {
                            return CompletableFuture.completedFuture(null);
                        }
                        if (!isTransitionToFromSubscriber) {
                            return this.streamMetadataStore.completeRGConfigUpdate(scope, rgName, versionedMetadata, createRGContext, this.executor);
                        }
                        Iterator<String> it = ((ReaderGroupConfigRecord) versionedMetadata.getObject()).getStartingStreamCuts().keySet().iterator();
                        String scopedReaderGroupName = NameUtils.getScopedReaderGroupName(scope, rgName);
                        Iterator it2 = removeStreams.stream().iterator();
                        Objects.requireNonNull(it2);
                        return Futures.loop(it2::hasNext, () -> {
                            Stream of = Stream.of((String) it2.next());
                            return this.streamMetadataStore.deleteSubscriber(of.getScope(), of.getStreamName(), scopedReaderGroupName, ((ReaderGroupConfigRecord) versionedMetadata.getObject()).getGeneration(), createRGContext, this.executor);
                        }, this.executor).thenCompose(r12 -> {
                            if (ReaderGroupConfig.StreamDataRetention.NONE.equals(ReaderGroupConfig.StreamDataRetention.values()[((ReaderGroupConfigRecord) versionedMetadata.getObject()).getRetentionTypeOrdinal()])) {
                                return CompletableFuture.completedFuture(null);
                            }
                            Objects.requireNonNull(it);
                            return Futures.loop(it::hasNext, () -> {
                                Stream of = Stream.of((String) it.next());
                                return this.streamMetadataStore.addSubscriber(of.getScope(), of.getStreamName(), scopedReaderGroupName, ((ReaderGroupConfigRecord) versionedMetadata.getObject()).getGeneration(), createRGContext, this.executor);
                            }, this.executor);
                        }).thenCompose(r122 -> {
                            return this.streamMetadataStore.completeRGConfigUpdate(scope, rgName, versionedMetadata, createRGContext, this.executor);
                        });
                    });
                }
                log.warn(requestId, "Skipping processing of Reader Group update request {} as UUID did not match.", new Object[]{Long.valueOf(requestId)});
                return CompletableFuture.completedFuture(null);
            });
        }, th -> {
            return Exceptions.unwrap(th) instanceof RetryableException;
        }, Integer.MAX_VALUE, this.executor);
    }
}
