package io.pravega.controller.server.eventProcessor;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AbstractIdleService;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.ClientFactory;
import io.pravega.client.admin.impl.ReaderGroupManagerImpl;
import io.pravega.client.netty.impl.ConnectionFactory;
import io.pravega.client.stream.ScalingPolicy;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.impl.ClientFactoryImpl;
import io.pravega.client.stream.impl.Controller;
import io.pravega.client.stream.impl.JavaSerializer;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.Retry;
import io.pravega.controller.eventProcessor.EventProcessorConfig;
import io.pravega.controller.eventProcessor.EventProcessorGroup;
import io.pravega.controller.eventProcessor.EventProcessorSystem;
import io.pravega.controller.eventProcessor.ExceptionHandler;
import io.pravega.controller.eventProcessor.impl.ConcurrentEventProcessor;
import io.pravega.controller.eventProcessor.impl.EventProcessorGroupConfigImpl;
import io.pravega.controller.eventProcessor.impl.EventProcessorSystemImpl;
import io.pravega.controller.fault.FailoverSweeper;
import io.pravega.controller.server.SegmentHelper;
import io.pravega.controller.server.eventProcessor.requesthandlers.AbortRequestHandler;
import io.pravega.controller.server.eventProcessor.requesthandlers.AutoScaleTask;
import io.pravega.controller.server.eventProcessor.requesthandlers.DeleteStreamTask;
import io.pravega.controller.server.eventProcessor.requesthandlers.ScaleOperationTask;
import io.pravega.controller.server.eventProcessor.requesthandlers.SealStreamTask;
import io.pravega.controller.server.eventProcessor.requesthandlers.StreamRequestHandler;
import io.pravega.controller.server.eventProcessor.requesthandlers.TruncateStreamTask;
import io.pravega.controller.server.eventProcessor.requesthandlers.UpdateStreamTask;
import io.pravega.controller.store.checkpoint.CheckpointStore;
import io.pravega.controller.store.checkpoint.CheckpointStoreException;
import io.pravega.controller.store.host.HostControllerStore;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.controller.task.Stream.StreamTransactionMetadataTasks;
import io.pravega.controller.util.Config;
import io.pravega.controller.util.RetryHelper;
import io.pravega.shared.controller.event.AbortEvent;
import io.pravega.shared.controller.event.CommitEvent;
import io.pravega.shared.controller.event.ControllerEvent;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/eventProcessor/ControllerEventProcessors.class */
public class ControllerEventProcessors extends AbstractIdleService implements FailoverSweeper {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(ControllerEventProcessors.class);
    public static final Serializer<CommitEvent> COMMIT_EVENT_SERIALIZER = new JavaSerializer();
    public static final Serializer<AbortEvent> ABORT_EVENT_SERIALIZER = new JavaSerializer();
    public static final Serializer<ControllerEvent> CONTROLLER_EVENT_SERIALIZER = new JavaSerializer();
    private static final long DELAY = 100;
    private static final int MULTIPLIER = 10;
    private static final long MAX_DELAY = 10000;
    private final String objectId;
    private final ControllerEventProcessorConfig config;
    private final CheckpointStore checkpointStore;
    private final EventProcessorSystem system;
    private final Controller controller;
    private final ClientFactory clientFactory;
    private final ScheduledExecutorService executor;
    private EventProcessorGroup<CommitEvent> commitEventProcessors;
    private EventProcessorGroup<AbortEvent> abortEventProcessors;
    private EventProcessorGroup<ControllerEvent> requestEventProcessors;
    private final StreamRequestHandler streamRequestHandler;
    private final CommitEventProcessor commitEventProcessor;
    private final AbortRequestHandler abortRequestHandler;

    public ControllerEventProcessors(String str, ControllerEventProcessorConfig controllerEventProcessorConfig, Controller controller, CheckpointStore checkpointStore, StreamMetadataStore streamMetadataStore, HostControllerStore hostControllerStore, SegmentHelper segmentHelper, ConnectionFactory connectionFactory, StreamMetadataTasks streamMetadataTasks, ScheduledExecutorService scheduledExecutorService) {
        this(str, controllerEventProcessorConfig, controller, checkpointStore, streamMetadataStore, hostControllerStore, segmentHelper, connectionFactory, streamMetadataTasks, null, scheduledExecutorService);
    }

    @VisibleForTesting
    ControllerEventProcessors(String str, ControllerEventProcessorConfig controllerEventProcessorConfig, Controller controller, CheckpointStore checkpointStore, StreamMetadataStore streamMetadataStore, HostControllerStore hostControllerStore, SegmentHelper segmentHelper, ConnectionFactory connectionFactory, StreamMetadataTasks streamMetadataTasks, EventProcessorSystem eventProcessorSystem, ScheduledExecutorService scheduledExecutorService) {
        this.objectId = "ControllerEventProcessors";
        this.config = controllerEventProcessorConfig;
        this.checkpointStore = checkpointStore;
        this.controller = controller;
        this.clientFactory = new ClientFactoryImpl(controllerEventProcessorConfig.getScopeName(), controller, connectionFactory);
        this.system = eventProcessorSystem == null ? new EventProcessorSystemImpl("Controller", str, controllerEventProcessorConfig.getScopeName(), this.clientFactory, new ReaderGroupManagerImpl(controllerEventProcessorConfig.getScopeName(), controller, this.clientFactory, connectionFactory)) : eventProcessorSystem;
        this.streamRequestHandler = new StreamRequestHandler(new AutoScaleTask(streamMetadataTasks, streamMetadataStore, scheduledExecutorService), new ScaleOperationTask(streamMetadataTasks, streamMetadataStore, scheduledExecutorService), new UpdateStreamTask(streamMetadataTasks, streamMetadataStore, scheduledExecutorService), new SealStreamTask(streamMetadataTasks, streamMetadataStore, scheduledExecutorService), new DeleteStreamTask(streamMetadataTasks, streamMetadataStore, scheduledExecutorService), new TruncateStreamTask(streamMetadataTasks, streamMetadataStore, scheduledExecutorService), scheduledExecutorService);
        this.commitEventProcessor = new CommitEventProcessor(streamMetadataStore, streamMetadataTasks, hostControllerStore, scheduledExecutorService, segmentHelper, connectionFactory);
        this.abortRequestHandler = new AbortRequestHandler(streamMetadataStore, streamMetadataTasks, hostControllerStore, scheduledExecutorService, segmentHelper, connectionFactory);
        this.executor = scheduledExecutorService;
    }

    protected void startUp() throws Exception {
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.objectId, "startUp", new Object[0]);
        try {
            log.info("Starting controller event processors");
            initialize();
            log.info("Controller event processors startUp complete");
            LoggerHelpers.traceLeave(log, this.objectId, "startUp", traceEnterWithContext, new Object[0]);
        } catch (Throwable th) {
            LoggerHelpers.traceLeave(log, this.objectId, "startUp", traceEnterWithContext, new Object[0]);
            throw th;
        }
    }

    protected void shutDown() {
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.objectId, "shutDown", new Object[0]);
        try {
            log.info("Stopping controller event processors");
            stopEventProcessors();
            log.info("Controller event processors shutDown complete");
            LoggerHelpers.traceLeave(log, this.objectId, "shutDown", traceEnterWithContext, new Object[0]);
        } catch (Throwable th) {
            LoggerHelpers.traceLeave(log, this.objectId, "shutDown", traceEnterWithContext, new Object[0]);
            throw th;
        }
    }

    @Override // io.pravega.controller.fault.FailoverSweeper
    public boolean isReady() {
        return isRunning();
    }

    @Override // io.pravega.controller.fault.FailoverSweeper
    public CompletableFuture<Void> sweepFailedProcesses(Supplier<Set<String>> supplier) {
        ArrayList arrayList = new ArrayList();
        if (this.commitEventProcessors != null) {
            arrayList.add(handleOrphanedReaders(this.commitEventProcessors, supplier));
        }
        if (this.abortEventProcessors != null) {
            arrayList.add(handleOrphanedReaders(this.abortEventProcessors, supplier));
        }
        if (this.requestEventProcessors != null) {
            arrayList.add(handleOrphanedReaders(this.requestEventProcessors, supplier));
        }
        return Futures.allOf(arrayList);
    }

    @Override // io.pravega.controller.fault.FailoverSweeper
    public CompletableFuture<Void> handleFailedProcess(String str) {
        ArrayList arrayList = new ArrayList();
        if (this.commitEventProcessors != null) {
            arrayList.add(RetryHelper.withRetriesAsync(() -> {
                return CompletableFuture.runAsync(() -> {
                    try {
                        this.commitEventProcessors.notifyProcessFailure(str);
                    } catch (CheckpointStoreException e) {
                        throw new CompletionException(e);
                    }
                }, this.executor);
            }, RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor));
        }
        if (this.abortEventProcessors != null) {
            arrayList.add(RetryHelper.withRetriesAsync(() -> {
                return CompletableFuture.runAsync(() -> {
                    try {
                        this.abortEventProcessors.notifyProcessFailure(str);
                    } catch (CheckpointStoreException e) {
                        throw new CompletionException(e);
                    }
                }, this.executor);
            }, RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor));
        }
        if (this.requestEventProcessors != null) {
            arrayList.add(RetryHelper.withRetriesAsync(() -> {
                return CompletableFuture.runAsync(() -> {
                    try {
                        this.requestEventProcessors.notifyProcessFailure(str);
                    } catch (CheckpointStoreException e) {
                        throw new CompletionException(e);
                    }
                }, this.executor);
            }, RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor));
        }
        return Futures.allOf(arrayList);
    }

    private CompletableFuture<Void> createStreams() {
        StreamConfiguration build = StreamConfiguration.builder().scope(this.config.getScopeName()).streamName(this.config.getCommitStreamName()).scalingPolicy(this.config.getCommitStreamScalingPolicy()).build();
        StreamConfiguration build2 = StreamConfiguration.builder().scope(this.config.getScopeName()).streamName(this.config.getAbortStreamName()).scalingPolicy(this.config.getAbortStreamScalingPolicy()).build();
        StreamConfiguration build3 = StreamConfiguration.builder().scope(this.config.getScopeName()).streamName(Config.SCALE_STREAM_NAME).scalingPolicy(ScalingPolicy.fixed(1)).build();
        return createScope(this.config.getScopeName()).thenCompose(r10 -> {
            return CompletableFuture.allOf(createStream(build), createStream(build2), createStream(build3));
        });
    }

    private CompletableFuture<Void> createScope(String str) {
        return Futures.toVoid(Retry.indefinitelyWithExpBackoff(DELAY, MULTIPLIER, MAX_DELAY, th -> {
            log.warn("Error creating event processor scope " + str, th);
        }).runAsync(() -> {
            return this.controller.createScope(str).thenAccept(bool -> {
                log.info("Created controller scope {}", str);
            });
        }, this.executor));
    }

    private CompletableFuture<Void> createStream(StreamConfiguration streamConfiguration) {
        return Futures.toVoid(Retry.indefinitelyWithExpBackoff(DELAY, MULTIPLIER, MAX_DELAY, th -> {
            log.warn("Error creating event processor stream " + streamConfiguration.getStreamName(), th);
        }).runAsync(() -> {
            return this.controller.createStream(streamConfiguration).thenAccept(bool -> {
                log.info("Created stream {}/{}", streamConfiguration.getScope(), streamConfiguration.getStreamName());
            });
        }, this.executor));
    }

    public CompletableFuture<Void> bootstrap(StreamTransactionMetadataTasks streamTransactionMetadataTasks, StreamMetadataTasks streamMetadataTasks) {
        log.info("Bootstrapping controller event processors");
        return createStreams().thenAcceptAsync(r7 -> {
            streamMetadataTasks.initializeStreamWriters(this.clientFactory, this.config.getRequestStreamName());
            streamTransactionMetadataTasks.initializeStreamWriters(this.clientFactory, this.config);
        }, (Executor) this.executor);
    }

    private CompletableFuture<Void> handleOrphanedReaders(EventProcessorGroup<? extends ControllerEvent> eventProcessorGroup, Supplier<Set<String>> supplier) {
        return RetryHelper.withRetriesAsync(() -> {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    return eventProcessorGroup.getProcesses();
                } catch (CheckpointStoreException e) {
                    if (e.getType().equals(CheckpointStoreException.Type.NoNode)) {
                        return Collections.emptySet();
                    }
                    throw new CompletionException(e);
                }
            }, this.executor);
        }, RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor).thenComposeAsync(set -> {
            return RetryHelper.withRetriesAsync(() -> {
                return CompletableFuture.supplyAsync(() -> {
                    try {
                        return new ImmutablePair(supplier.get(), set);
                    } catch (Exception e) {
                        log.error(String.format("Error fetching current processes%s", eventProcessorGroup.toString()), e);
                        throw new CompletionException(e);
                    }
                }, this.executor);
            }, RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor);
        }).thenComposeAsync(immutablePair -> {
            Set set2 = (Set) immutablePair.getLeft();
            Set<String> set3 = (Set) immutablePair.getRight();
            if (set3 == null || set3.isEmpty()) {
                return CompletableFuture.completedFuture(null);
            }
            if (set2 != null) {
                set3.removeAll(set2);
            }
            ArrayList arrayList = new ArrayList();
            for (String str : set3) {
                arrayList.add(RetryHelper.withRetriesAsync(() -> {
                    return CompletableFuture.runAsync(() -> {
                        try {
                            eventProcessorGroup.notifyProcessFailure(str);
                        } catch (CheckpointStoreException e) {
                            log.error(String.format("Error notifying failure of process=%s in event processor group %s", str, eventProcessorGroup.toString()), e);
                            throw new CompletionException(e);
                        }
                    }, this.executor);
                }, RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor));
            }
            return Futures.allOf(arrayList);
        });
    }

    private void initialize() throws Exception {
        EventProcessorConfig build = EventProcessorConfig.builder().config(EventProcessorGroupConfigImpl.builder().streamName(this.config.getCommitStreamName()).readerGroupName(this.config.getCommitReaderGroupName()).eventProcessorCount(this.config.getCommitReaderGroupSize()).checkpointConfig(this.config.getCommitCheckpointConfig()).build()).decider(ExceptionHandler.DEFAULT_EXCEPTION_HANDLER).serializer(COMMIT_EVENT_SERIALIZER).supplier(() -> {
            return this.commitEventProcessor;
        }).build();
        log.info("Creating commit event processors");
        Retry.indefinitelyWithExpBackoff(DELAY, MULTIPLIER, MAX_DELAY, th -> {
            log.warn("Error creating commit event processor group", th);
        }).run(() -> {
            this.commitEventProcessors = this.system.createEventProcessorGroup(build, this.checkpointStore);
            return null;
        });
        EventProcessorConfig build2 = EventProcessorConfig.builder().config(EventProcessorGroupConfigImpl.builder().streamName(this.config.getAbortStreamName()).readerGroupName(this.config.getAbortReaderGroupName()).eventProcessorCount(this.config.getAbortReaderGroupSize()).checkpointConfig(this.config.getAbortCheckpointConfig()).build()).decider(ExceptionHandler.DEFAULT_EXCEPTION_HANDLER).serializer(ABORT_EVENT_SERIALIZER).supplier(() -> {
            return new ConcurrentEventProcessor(this.abortRequestHandler, this.executor);
        }).build();
        log.info("Creating abort event processors");
        Retry.indefinitelyWithExpBackoff(DELAY, MULTIPLIER, MAX_DELAY, th2 -> {
            log.warn("Error creating commit event processor group", th2);
        }).run(() -> {
            this.abortEventProcessors = this.system.createEventProcessorGroup(build2, this.checkpointStore);
            return null;
        });
        EventProcessorConfig build3 = EventProcessorConfig.builder().config(EventProcessorGroupConfigImpl.builder().streamName(this.config.getRequestStreamName()).readerGroupName(this.config.getRequestReaderGroupName()).eventProcessorCount(1).checkpointConfig(this.config.getRequestStreamCheckpointConfig()).build()).decider(ExceptionHandler.DEFAULT_EXCEPTION_HANDLER).serializer(CONTROLLER_EVENT_SERIALIZER).supplier(() -> {
            return new ConcurrentEventProcessor(this.streamRequestHandler, this.executor);
        }).build();
        log.info("Creating request event processors");
        Retry.indefinitelyWithExpBackoff(DELAY, MULTIPLIER, MAX_DELAY, th3 -> {
            log.warn("Error creating request event processor group", th3);
        }).run(() -> {
            this.requestEventProcessors = this.system.createEventProcessorGroup(build3, this.checkpointStore);
            return null;
        });
        log.info("Awaiting start of commit event processors");
        this.commitEventProcessors.awaitRunning();
        log.info("Awaiting start of abort event processors");
        this.abortEventProcessors.awaitRunning();
        log.info("Awaiting start of request event processors");
        this.requestEventProcessors.awaitRunning();
    }

    private void stopEventProcessors() {
        if (this.commitEventProcessors != null) {
            log.info("Stopping commit event processors");
            this.commitEventProcessors.stopAsync();
        }
        if (this.abortEventProcessors != null) {
            log.info("Stopping abort event processors");
            this.abortEventProcessors.stopAsync();
        }
        if (this.requestEventProcessors != null) {
            log.info("Stopping request event processors");
            this.requestEventProcessors.stopAsync();
        }
        if (this.commitEventProcessors != null) {
            log.info("Awaiting termination of commit event processors");
            this.commitEventProcessors.awaitTerminated();
        }
        if (this.abortEventProcessors != null) {
            log.info("Awaiting termination of abort event processors");
            this.abortEventProcessors.awaitTerminated();
        }
        if (this.requestEventProcessors != null) {
            log.info("Awaiting termination of request event processors");
            this.requestEventProcessors.awaitTerminated();
        }
    }
}
