package io.pravega.controller.server.eventProcessor.requesthandlers;

import com.google.common.base.Preconditions;
import io.pravega.client.control.impl.ModelHelper;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamCut;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.tracing.TagLogger;
import io.pravega.controller.retryable.RetryableException;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.controller.util.RetryHelper;
import io.pravega.shared.controller.event.CreateReaderGroupEvent;
import io.pravega.shared.controller.event.RGStreamCutRecord;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/eventProcessor/requesthandlers/CreateReaderGroupTask.class */
public class CreateReaderGroupTask implements ReaderGroupTask<CreateReaderGroupEvent> {
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(CreateReaderGroupTask.class));
    private final StreamMetadataStore streamMetadataStore;
    private final StreamMetadataTasks streamMetadataTasks;
    private final ScheduledExecutorService executor;

    public CreateReaderGroupTask(StreamMetadataTasks streamMetadataTasks, StreamMetadataStore streamMetadataStore, ScheduledExecutorService scheduledExecutorService) {
        Preconditions.checkNotNull(streamMetadataStore);
        Preconditions.checkNotNull(streamMetadataTasks);
        Preconditions.checkNotNull(scheduledExecutorService);
        this.streamMetadataStore = streamMetadataStore;
        this.streamMetadataTasks = streamMetadataTasks;
        this.executor = scheduledExecutorService;
    }

    @Override // io.pravega.controller.server.eventProcessor.requesthandlers.ReaderGroupTask
    public CompletableFuture<Void> execute(CreateReaderGroupEvent createReaderGroupEvent) {
        String scope = createReaderGroupEvent.getScope();
        String rgName = createReaderGroupEvent.getRgName();
        UUID readerGroupId = createReaderGroupEvent.getReaderGroupId();
        ReaderGroupConfig configFromEvent = getConfigFromEvent(createReaderGroupEvent);
        long requestId = createReaderGroupEvent.getRequestId();
        OperationContext createRGContext = this.streamMetadataStore.createRGContext(scope, rgName, requestId);
        return RetryHelper.withRetriesAsync(() -> {
            return this.streamMetadataStore.getReaderGroupId(scope, rgName, createRGContext, this.executor).thenCompose(uuid -> {
                if (uuid.equals(readerGroupId)) {
                    return this.streamMetadataTasks.isRGCreationComplete(scope, rgName, createRGContext).thenCompose(bool -> {
                        return !bool.booleanValue() ? Futures.toVoid(this.streamMetadataTasks.createReaderGroupTasks(scope, rgName, configFromEvent, createReaderGroupEvent.getCreateTimeStamp(), createRGContext)) : CompletableFuture.completedFuture(null);
                    });
                }
                log.warn(requestId, "Skipping processing of CreateReaderGroupEvent with stale UUID.", new Object[0]);
                return CompletableFuture.completedFuture(null);
            });
        }, th -> {
            return Exceptions.unwrap(th) instanceof RetryableException;
        }, Integer.MAX_VALUE, this.executor);
    }

    private ReaderGroupConfig getConfigFromEvent(CreateReaderGroupEvent createReaderGroupEvent) {
        Map<Stream, StreamCut> streamCutMapFromRecord = getStreamCutMapFromRecord(createReaderGroupEvent.getStartingStreamCuts());
        return ReaderGroupConfig.cloneConfig(ReaderGroupConfig.builder().groupRefreshTimeMillis(createReaderGroupEvent.getGroupRefreshTimeMillis()).automaticCheckpointIntervalMillis(createReaderGroupEvent.getAutomaticCheckpointIntervalMillis()).maxOutstandingCheckpointRequest(createReaderGroupEvent.getMaxOutstandingCheckpointRequest()).retentionType(ReaderGroupConfig.StreamDataRetention.values()[createReaderGroupEvent.getRetentionTypeOrdinal()]).startingStreamCuts(streamCutMapFromRecord).endingStreamCuts(getStreamCutMapFromRecord(createReaderGroupEvent.getEndingStreamCuts())).build(), createReaderGroupEvent.getReaderGroupId(), createReaderGroupEvent.getGeneration());
    }

    private Map<Stream, StreamCut> getStreamCutMapFromRecord(Map<String, RGStreamCutRecord> map) {
        return (Map) map.entrySet().stream().collect(Collectors.toMap(entry -> {
            return Stream.of((String) entry.getKey());
        }, entry2 -> {
            return ModelHelper.generateStreamCut(Stream.of((String) entry2.getKey()).getScope(), Stream.of((String) entry2.getKey()).getStreamName(), ((RGStreamCutRecord) entry2.getValue()).getStreamCut());
        }));
    }
}
