package io.pravega.controller.server.eventProcessor;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractIdleService;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.admin.impl.ReaderGroupManagerImpl;
import io.pravega.client.connection.impl.ConnectionPool;
import io.pravega.client.control.impl.Controller;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.stream.Position;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.impl.ClientFactoryImpl;
import io.pravega.common.Exceptions;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.concurrent.ExecutorServiceHelpers;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.Retry;
import io.pravega.controller.eventProcessor.CheckpointConfig;
import io.pravega.controller.eventProcessor.EventProcessorConfig;
import io.pravega.controller.eventProcessor.EventProcessorGroup;
import io.pravega.controller.eventProcessor.EventProcessorSystem;
import io.pravega.controller.eventProcessor.EventSerializer;
import io.pravega.controller.eventProcessor.ExceptionHandler;
import io.pravega.controller.eventProcessor.impl.ConcurrentEventProcessor;
import io.pravega.controller.eventProcessor.impl.EventProcessorGroupConfigImpl;
import io.pravega.controller.eventProcessor.impl.EventProcessorSystemImpl;
import io.pravega.controller.fault.FailoverSweeper;
import io.pravega.controller.server.eventProcessor.requesthandlers.AbortRequestHandler;
import io.pravega.controller.server.eventProcessor.requesthandlers.AutoScaleTask;
import io.pravega.controller.server.eventProcessor.requesthandlers.CommitRequestHandler;
import io.pravega.controller.server.eventProcessor.requesthandlers.CreateReaderGroupTask;
import io.pravega.controller.server.eventProcessor.requesthandlers.DeleteReaderGroupTask;
import io.pravega.controller.server.eventProcessor.requesthandlers.DeleteStreamTask;
import io.pravega.controller.server.eventProcessor.requesthandlers.ScaleOperationTask;
import io.pravega.controller.server.eventProcessor.requesthandlers.SealStreamTask;
import io.pravega.controller.server.eventProcessor.requesthandlers.StreamRequestHandler;
import io.pravega.controller.server.eventProcessor.requesthandlers.TruncateStreamTask;
import io.pravega.controller.server.eventProcessor.requesthandlers.UpdateReaderGroupTask;
import io.pravega.controller.server.eventProcessor.requesthandlers.UpdateStreamTask;
import io.pravega.controller.server.eventProcessor.requesthandlers.kvtable.CreateTableTask;
import io.pravega.controller.server.eventProcessor.requesthandlers.kvtable.DeleteTableTask;
import io.pravega.controller.server.eventProcessor.requesthandlers.kvtable.TableRequestHandler;
import io.pravega.controller.store.checkpoint.CheckpointStore;
import io.pravega.controller.store.checkpoint.CheckpointStoreException;
import io.pravega.controller.store.kvtable.KVTableMetadataStore;
import io.pravega.controller.store.stream.BucketStore;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.task.KeyValueTable.TableMetadataTasks;
import io.pravega.controller.task.Stream.StreamMetadataTasks;
import io.pravega.controller.task.Stream.StreamTransactionMetadataTasks;
import io.pravega.controller.util.Config;
import io.pravega.controller.util.RetryHelper;
import io.pravega.shared.controller.event.AbortEvent;
import io.pravega.shared.controller.event.CommitEvent;
import io.pravega.shared.controller.event.ControllerEvent;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/eventProcessor/ControllerEventProcessors.class */
public class ControllerEventProcessors extends AbstractIdleService implements FailoverSweeper, AutoCloseable {
    private static final long DELAY = 100;
    private static final int MULTIPLIER = 10;
    private static final long MAX_DELAY = 10000;
    private final String objectId;
    private final ControllerEventProcessorConfig config;
    private final CheckpointStore checkpointStore;
    private final EventProcessorSystem system;
    private final Controller controller;
    private final ClientFactoryImpl clientFactory;
    private final ScheduledExecutorService executor;
    private EventProcessorGroup<CommitEvent> commitEventProcessors;
    private EventProcessorGroup<AbortEvent> abortEventProcessors;
    private EventProcessorGroup<ControllerEvent> requestEventProcessors;
    private EventProcessorGroup<ControllerEvent> kvtRequestEventProcessors;
    private final StreamRequestHandler streamRequestHandler;
    private final CommitRequestHandler commitRequestHandler;
    private final AbortRequestHandler abortRequestHandler;
    private final TableRequestHandler kvtRequestHandler;
    private final long rebalanceIntervalMillis;
    private final AtomicLong truncationInterval;
    private ScheduledExecutorService rebalanceExecutor;
    private final AtomicBoolean bootstrapCompleted;

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ControllerEventProcessors.class);
    public static final EventSerializer<CommitEvent> COMMIT_EVENT_SERIALIZER = new EventSerializer<>();
    public static final EventSerializer<AbortEvent> ABORT_EVENT_SERIALIZER = new EventSerializer<>();
    public static final EventSerializer<ControllerEvent> CONTROLLER_EVENT_SERIALIZER = new EventSerializer<>();
    private static final long TRUNCATION_INTERVAL_MILLIS = Duration.ofMinutes(10).toMillis();

    public ControllerEventProcessors(String str, ControllerEventProcessorConfig controllerEventProcessorConfig, Controller controller, CheckpointStore checkpointStore, StreamMetadataStore streamMetadataStore, BucketStore bucketStore, ConnectionPool connectionPool, StreamMetadataTasks streamMetadataTasks, StreamTransactionMetadataTasks streamTransactionMetadataTasks, KVTableMetadataStore kVTableMetadataStore, TableMetadataTasks tableMetadataTasks, ScheduledExecutorService scheduledExecutorService) {
        this(str, controllerEventProcessorConfig, controller, checkpointStore, streamMetadataStore, bucketStore, connectionPool, streamMetadataTasks, streamTransactionMetadataTasks, kVTableMetadataStore, tableMetadataTasks, null, scheduledExecutorService);
    }

    @VisibleForTesting
    public ControllerEventProcessors(String str, ControllerEventProcessorConfig controllerEventProcessorConfig, Controller controller, CheckpointStore checkpointStore, StreamMetadataStore streamMetadataStore, BucketStore bucketStore, ConnectionPool connectionPool, StreamMetadataTasks streamMetadataTasks, StreamTransactionMetadataTasks streamTransactionMetadataTasks, KVTableMetadataStore kVTableMetadataStore, TableMetadataTasks tableMetadataTasks, EventProcessorSystem eventProcessorSystem, ScheduledExecutorService scheduledExecutorService) {
        this.bootstrapCompleted = new AtomicBoolean(false);
        this.objectId = "ControllerEventProcessors";
        this.config = controllerEventProcessorConfig;
        this.checkpointStore = checkpointStore;
        this.controller = controller;
        this.clientFactory = new ClientFactoryImpl(controllerEventProcessorConfig.getScopeName(), controller, connectionPool);
        this.system = eventProcessorSystem == null ? new EventProcessorSystemImpl("Controller", str, controllerEventProcessorConfig.getScopeName(), this.clientFactory, new ReaderGroupManagerImpl(controllerEventProcessorConfig.getScopeName(), controller, this.clientFactory)) : eventProcessorSystem;
        this.streamRequestHandler = new StreamRequestHandler(new AutoScaleTask(streamMetadataTasks, streamMetadataStore, scheduledExecutorService), new ScaleOperationTask(streamMetadataTasks, streamMetadataStore, scheduledExecutorService), new UpdateStreamTask(streamMetadataTasks, streamMetadataStore, bucketStore, scheduledExecutorService), new SealStreamTask(streamMetadataTasks, streamTransactionMetadataTasks, streamMetadataStore, scheduledExecutorService), new DeleteStreamTask(streamMetadataTasks, streamMetadataStore, bucketStore, scheduledExecutorService), new TruncateStreamTask(streamMetadataTasks, streamMetadataStore, scheduledExecutorService), new CreateReaderGroupTask(streamMetadataTasks, streamMetadataStore, scheduledExecutorService), new DeleteReaderGroupTask(streamMetadataTasks, streamMetadataStore, scheduledExecutorService), new UpdateReaderGroupTask(streamMetadataTasks, streamMetadataStore, scheduledExecutorService), streamMetadataStore, scheduledExecutorService);
        this.commitRequestHandler = new CommitRequestHandler(streamMetadataStore, streamMetadataTasks, streamTransactionMetadataTasks, bucketStore, scheduledExecutorService);
        this.abortRequestHandler = new AbortRequestHandler(streamMetadataStore, streamMetadataTasks, scheduledExecutorService);
        this.kvtRequestHandler = new TableRequestHandler(new CreateTableTask(kVTableMetadataStore, tableMetadataTasks, scheduledExecutorService), new DeleteTableTask(kVTableMetadataStore, tableMetadataTasks, scheduledExecutorService), kVTableMetadataStore, scheduledExecutorService);
        this.executor = scheduledExecutorService;
        this.rebalanceIntervalMillis = controllerEventProcessorConfig.getRebalanceIntervalMillis();
        this.truncationInterval = new AtomicLong(TRUNCATION_INTERVAL_MILLIS);
    }

    public boolean isMetadataServiceConnected() {
        return this.checkpointStore.isHealthy();
    }

    public boolean isBootstrapCompleted() {
        return this.bootstrapCompleted.get();
    }

    @Override // io.pravega.controller.fault.FailoverSweeper
    public boolean isReady() {
        boolean isMetadataServiceConnected = isMetadataServiceConnected();
        boolean isBootstrapCompleted = isBootstrapCompleted();
        boolean isRunning = isRunning();
        boolean z = isMetadataServiceConnected && isBootstrapCompleted && isRunning;
        log.debug("IsReady={} as isMetaConnected={}, isBootstrapComplete={}, isSvcRunning={}", new Object[]{Boolean.valueOf(z), Boolean.valueOf(isMetadataServiceConnected), Boolean.valueOf(isBootstrapCompleted), Boolean.valueOf(isRunning)});
        return z;
    }

    protected void startUp() throws Exception {
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.objectId, "startUp", new Object[0]);
        try {
            log.info("Starting controller event processors");
            this.rebalanceExecutor = ExecutorServiceHelpers.newScheduledThreadPool(1, "event-processor");
            initialize();
            log.info("Controller event processors startUp complete");
            LoggerHelpers.traceLeave(log, this.objectId, "startUp", traceEnterWithContext, new Object[0]);
        } catch (Throwable th) {
            LoggerHelpers.traceLeave(log, this.objectId, "startUp", traceEnterWithContext, new Object[0]);
            throw th;
        }
    }

    protected void shutDown() {
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.objectId, "shutDown", new Object[0]);
        try {
            log.info("Stopping controller event processors");
            stopEventProcessors();
            this.rebalanceExecutor.shutdownNow();
            this.clientFactory.close();
            log.info("Controller event processors shutDown complete");
            LoggerHelpers.traceLeave(log, this.objectId, "shutDown", traceEnterWithContext, new Object[0]);
        } catch (Throwable th) {
            LoggerHelpers.traceLeave(log, this.objectId, "shutDown", traceEnterWithContext, new Object[0]);
            throw th;
        }
    }

    @Override // io.pravega.controller.fault.FailoverSweeper
    public CompletableFuture<Void> sweepFailedProcesses(Supplier<Set<String>> supplier) {
        ArrayList arrayList = new ArrayList();
        if (this.commitEventProcessors != null) {
            arrayList.add(handleOrphanedReaders(this.commitEventProcessors, supplier));
        }
        if (this.abortEventProcessors != null) {
            arrayList.add(handleOrphanedReaders(this.abortEventProcessors, supplier));
        }
        if (this.requestEventProcessors != null) {
            arrayList.add(handleOrphanedReaders(this.requestEventProcessors, supplier));
        }
        if (this.kvtRequestEventProcessors != null) {
            arrayList.add(handleOrphanedReaders(this.kvtRequestEventProcessors, supplier));
        }
        return Futures.allOf(arrayList);
    }

    @Override // io.pravega.controller.fault.FailoverSweeper
    public CompletableFuture<Void> handleFailedProcess(String str) {
        ArrayList arrayList = new ArrayList();
        if (this.commitEventProcessors != null) {
            arrayList.add(RetryHelper.withRetriesAsync(() -> {
                return CompletableFuture.runAsync(() -> {
                    try {
                        this.commitEventProcessors.notifyProcessFailure(str);
                    } catch (CheckpointStoreException e) {
                        throw new CompletionException(e);
                    }
                }, this.executor);
            }, RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor));
        }
        if (this.abortEventProcessors != null) {
            arrayList.add(RetryHelper.withRetriesAsync(() -> {
                return CompletableFuture.runAsync(() -> {
                    try {
                        this.abortEventProcessors.notifyProcessFailure(str);
                    } catch (CheckpointStoreException e) {
                        throw new CompletionException(e);
                    }
                }, this.executor);
            }, RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor));
        }
        if (this.requestEventProcessors != null) {
            arrayList.add(RetryHelper.withRetriesAsync(() -> {
                return CompletableFuture.runAsync(() -> {
                    try {
                        this.requestEventProcessors.notifyProcessFailure(str);
                    } catch (CheckpointStoreException e) {
                        throw new CompletionException(e);
                    }
                }, this.executor);
            }, RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor));
        }
        if (this.kvtRequestEventProcessors != null) {
            arrayList.add(RetryHelper.withRetriesAsync(() -> {
                return CompletableFuture.runAsync(() -> {
                    try {
                        this.kvtRequestEventProcessors.notifyProcessFailure(str);
                    } catch (CheckpointStoreException e) {
                        throw new CompletionException(e);
                    }
                }, this.executor);
            }, RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor));
        }
        return Futures.allOf(arrayList);
    }

    private CompletableFuture<Void> createStreams() {
        StreamConfiguration build = StreamConfiguration.builder().scalingPolicy(this.config.getCommitStreamScalingPolicy()).build();
        StreamConfiguration build2 = StreamConfiguration.builder().scalingPolicy(this.config.getAbortStreamScalingPolicy()).build();
        StreamConfiguration build3 = StreamConfiguration.builder().scalingPolicy(this.config.getRequestStreamScalingPolicy()).build();
        StreamConfiguration build4 = StreamConfiguration.builder().scalingPolicy(this.config.getKvtStreamScalingPolicy()).build();
        String scopeName = this.config.getScopeName();
        return createScope(scopeName).thenCompose(r14 -> {
            return CompletableFuture.allOf(createStream(scopeName, this.config.getCommitStreamName(), build), createStream(scopeName, this.config.getAbortStreamName(), build2), createStream(scopeName, Config.SCALE_STREAM_NAME, build3), createStream(scopeName, this.config.getKvtStreamName(), build4));
        });
    }

    private CompletableFuture<Void> createScope(String str) {
        return Futures.toVoid(Retry.indefinitelyWithExpBackoff(DELAY, MULTIPLIER, MAX_DELAY, th -> {
            log.warn("Error creating event processor scope {} with exception {}", str, Exceptions.unwrap(th).toString());
        }).runAsync(() -> {
            return this.controller.createScope(str).thenAccept(bool -> {
                log.info("Created controller scope {}", str);
            });
        }, this.executor));
    }

    private CompletableFuture<Void> createStream(String str, String str2, StreamConfiguration streamConfiguration) {
        return Futures.toVoid(Retry.indefinitelyWithExpBackoff(DELAY, MULTIPLIER, MAX_DELAY, th -> {
            log.warn("Error creating event processor stream {} with exception {}", str2, Exceptions.unwrap(th).toString());
        }).runAsync(() -> {
            return this.controller.createStream(str, str2, streamConfiguration).thenAccept(bool -> {
                log.info("Created stream {}/{}", str, str2);
            });
        }, this.executor));
    }

    public CompletableFuture<Void> bootstrap(StreamTransactionMetadataTasks streamTransactionMetadataTasks, StreamMetadataTasks streamMetadataTasks, TableMetadataTasks tableMetadataTasks) {
        log.info("Bootstrapping controller event processors");
        return createStreams().thenAcceptAsync(r10 -> {
            streamMetadataTasks.initializeStreamWriters(this.clientFactory, this.config.getRequestStreamName());
            streamTransactionMetadataTasks.initializeStreamWriters((EventStreamClientFactory) this.clientFactory, this.config);
            tableMetadataTasks.initializeStreamWriters(this.clientFactory, this.config.getKvtStreamName());
            long j = this.truncationInterval.get();
            Futures.loop(this::isRunning, () -> {
                return Futures.delayedFuture(() -> {
                    return truncate(this.config.getRequestStreamName(), this.config.getRequestReaderGroupName(), streamMetadataTasks);
                }, j, this.executor);
            }, this.executor);
            Futures.loop(this::isRunning, () -> {
                return Futures.delayedFuture(() -> {
                    return truncate(this.config.getCommitStreamName(), this.config.getCommitReaderGroupName(), streamMetadataTasks);
                }, j, this.executor);
            }, this.executor);
            Futures.loop(this::isRunning, () -> {
                return Futures.delayedFuture(() -> {
                    return truncate(this.config.getAbortStreamName(), this.config.getAbortReaderGroupName(), streamMetadataTasks);
                }, j, this.executor);
            }, this.executor);
            Futures.loop(this::isRunning, () -> {
                return Futures.delayedFuture(() -> {
                    return truncate(this.config.getKvtStreamName(), this.config.getKvtReaderGroupName(), streamMetadataTasks);
                }, j, this.executor);
            }, this.executor);
            this.bootstrapCompleted.set(true);
            log.info("Completed bootstrapping event processors.");
        }, (Executor) this.executor);
    }

    @VisibleForTesting
    CompletableFuture<Void> truncate(String str, String str2, StreamMetadataTasks streamMetadataTasks) {
        Preconditions.checkState(isRunning());
        try {
            HashMap hashMap = new HashMap();
            Iterator<String> it = this.checkpointStore.getProcesses().iterator();
            while (it.hasNext()) {
                hashMap.putAll(this.checkpointStore.getPositions(it.next(), str2));
            }
            Map<Long, Long> map = (Map) hashMap.entrySet().stream().map(entry -> {
                return entry.getValue() == null ? Collections.emptyMap() : convertPosition(entry);
            }).reduce(Collections.emptyMap(), (map2, map3) -> {
                HashMap hashMap2 = new HashMap(map2);
                map3.forEach((l, l2) -> {
                    if (map2.containsKey(l)) {
                        hashMap2.put(l, Long.valueOf(Math.max(((Long) map2.get(l)).longValue(), l2.longValue())));
                    } else {
                        hashMap2.put(l, l2);
                    }
                });
                return hashMap2;
            });
            return streamMetadataTasks.startTruncation(this.config.getScopeName(), str, map, null).handle((bool, th) -> {
                if (th != null) {
                    log.warn("Submission for truncation for stream {} failed. Will be retried in next iteration.", str);
                    return null;
                }
                if (bool.booleanValue()) {
                    log.debug("truncation for stream {} at streamcut {} submitted.", str, map);
                    return null;
                }
                log.debug("truncation for stream {} at streamcut {} rejected.", str, map);
                return null;
            });
        } catch (Exception e) {
            Throwable unwrap = Exceptions.unwrap(e);
            log.warn("Encountered exception attempting to truncate stream {}. {}: {}", new Object[]{str, unwrap.getClass().getName(), unwrap.getMessage()});
            return CompletableFuture.completedFuture(null);
        }
    }

    private Map<Long, Long> convertPosition(Map.Entry<String, Position> entry) {
        return (Map) entry.getValue().asImpl().getOwnedSegmentsWithOffsets().entrySet().stream().collect(Collectors.toMap(entry2 -> {
            return Long.valueOf(((Segment) entry2.getKey()).getSegmentId());
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    private CompletableFuture<Void> handleOrphanedReaders(EventProcessorGroup<? extends ControllerEvent> eventProcessorGroup, Supplier<Set<String>> supplier) {
        return RetryHelper.withRetriesAsync(() -> {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    return eventProcessorGroup.getProcesses();
                } catch (CheckpointStoreException e) {
                    if (e.getType().equals(CheckpointStoreException.Type.NoNode)) {
                        return Collections.emptySet();
                    }
                    throw new CompletionException(e);
                }
            }, this.executor);
        }, RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor).thenComposeAsync(set -> {
            return RetryHelper.withRetriesAsync(() -> {
                return CompletableFuture.supplyAsync(() -> {
                    try {
                        return new ImmutablePair((Set) supplier.get(), set);
                    } catch (Exception e) {
                        log.error(String.format("Error fetching current processes%s", eventProcessorGroup.toString()), e);
                        throw new CompletionException(e);
                    }
                }, this.executor);
            }, RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor);
        }).thenComposeAsync(immutablePair -> {
            Set set2 = (Set) immutablePair.getLeft();
            Set<String> set3 = (Set) immutablePair.getRight();
            if (set3 == null || set3.isEmpty()) {
                return CompletableFuture.completedFuture(null);
            }
            if (set2 != null) {
                set3.removeAll(set2);
            }
            ArrayList arrayList = new ArrayList();
            for (String str : set3) {
                arrayList.add(RetryHelper.withRetriesAsync(() -> {
                    return CompletableFuture.runAsync(() -> {
                        try {
                            eventProcessorGroup.notifyProcessFailure(str);
                        } catch (CheckpointStoreException e) {
                            log.error(String.format("Error notifying failure of process=%s in event processor group %s", str, eventProcessorGroup.toString()), e);
                            throw new CompletionException(e);
                        }
                    }, this.executor);
                }, RetryHelper.RETRYABLE_PREDICATE, Integer.MAX_VALUE, this.executor));
            }
            return Futures.allOf(arrayList);
        });
    }

    private void initialize() {
        EventProcessorConfig build = EventProcessorConfig.builder().config(EventProcessorGroupConfigImpl.builder().streamName(this.config.getCommitStreamName()).readerGroupName(this.config.getCommitReaderGroupName()).eventProcessorCount(this.config.getCommitReaderGroupSize()).checkpointConfig(CheckpointConfig.none()).build()).decider(ExceptionHandler.DEFAULT_EXCEPTION_HANDLER).serializer(COMMIT_EVENT_SERIALIZER).supplier(() -> {
            return new ConcurrentEventProcessor(this.commitRequestHandler, this.executor);
        }).minRebalanceIntervalMillis(this.rebalanceIntervalMillis).build();
        log.debug("Creating commit event processors");
        Retry.indefinitelyWithExpBackoff(DELAY, MULTIPLIER, MAX_DELAY, th -> {
            log.warn("Error creating commit event processor group", th);
        }).run(() -> {
            this.commitEventProcessors = this.system.createEventProcessorGroup(build, this.checkpointStore, this.rebalanceExecutor);
            return null;
        });
        EventProcessorConfig build2 = EventProcessorConfig.builder().config(EventProcessorGroupConfigImpl.builder().streamName(this.config.getAbortStreamName()).readerGroupName(this.config.getAbortReaderGroupName()).eventProcessorCount(this.config.getAbortReaderGroupSize()).checkpointConfig(CheckpointConfig.none()).build()).decider(ExceptionHandler.DEFAULT_EXCEPTION_HANDLER).serializer(ABORT_EVENT_SERIALIZER).supplier(() -> {
            return new ConcurrentEventProcessor(this.abortRequestHandler, this.executor);
        }).minRebalanceIntervalMillis(this.rebalanceIntervalMillis).build();
        log.debug("Creating abort event processors");
        Retry.indefinitelyWithExpBackoff(DELAY, MULTIPLIER, MAX_DELAY, th2 -> {
            log.warn("Error creating commit event processor group", th2);
        }).run(() -> {
            this.abortEventProcessors = this.system.createEventProcessorGroup(build2, this.checkpointStore, this.rebalanceExecutor);
            return null;
        });
        EventProcessorConfig build3 = EventProcessorConfig.builder().config(EventProcessorGroupConfigImpl.builder().streamName(this.config.getRequestStreamName()).readerGroupName(this.config.getRequestReaderGroupName()).eventProcessorCount(1).checkpointConfig(CheckpointConfig.none()).build()).decider(ExceptionHandler.DEFAULT_EXCEPTION_HANDLER).serializer(CONTROLLER_EVENT_SERIALIZER).supplier(() -> {
            return new ConcurrentEventProcessor(this.streamRequestHandler, this.executor);
        }).minRebalanceIntervalMillis(this.rebalanceIntervalMillis).build();
        log.debug("Creating stream request event processors");
        Retry.indefinitelyWithExpBackoff(DELAY, MULTIPLIER, MAX_DELAY, th3 -> {
            log.warn("Error creating request event processor group", th3);
        }).run(() -> {
            this.requestEventProcessors = this.system.createEventProcessorGroup(build3, this.checkpointStore, this.rebalanceExecutor);
            return null;
        });
        EventProcessorConfig build4 = EventProcessorConfig.builder().config(EventProcessorGroupConfigImpl.builder().streamName(this.config.getKvtStreamName()).readerGroupName(this.config.getKvtReaderGroupName()).eventProcessorCount(1).checkpointConfig(CheckpointConfig.none()).build()).decider(ExceptionHandler.DEFAULT_EXCEPTION_HANDLER).serializer(CONTROLLER_EVENT_SERIALIZER).supplier(() -> {
            return new ConcurrentEventProcessor(this.kvtRequestHandler, this.executor);
        }).minRebalanceIntervalMillis(this.rebalanceIntervalMillis).build();
        log.debug("Creating kvt request event processors");
        Retry.indefinitelyWithExpBackoff(DELAY, MULTIPLIER, MAX_DELAY, th4 -> {
            log.warn("Error creating request event processor group", th4);
        }).run(() -> {
            this.kvtRequestEventProcessors = this.system.createEventProcessorGroup(build4, this.checkpointStore, this.rebalanceExecutor);
            return null;
        });
        log.info("Awaiting start of event processors...");
        this.commitEventProcessors.awaitRunning();
        log.info("Commit event processor started.");
        this.abortEventProcessors.awaitRunning();
        log.info("Abort event processor started.");
        this.requestEventProcessors.awaitRunning();
        log.info("Stream request event processor started.");
        this.kvtRequestEventProcessors.awaitRunning();
        log.info("KVT request event processor started.");
    }

    private void stopEventProcessors() {
        if (this.commitEventProcessors != null) {
            log.info("Stopping commit event processors");
            this.commitEventProcessors.stopAsync();
        }
        if (this.abortEventProcessors != null) {
            log.info("Stopping abort event processors");
            this.abortEventProcessors.stopAsync();
        }
        if (this.requestEventProcessors != null) {
            log.info("Stopping request event processors");
            this.requestEventProcessors.stopAsync();
        }
        if (this.kvtRequestEventProcessors != null) {
            log.info("Stopping kvt request event processors");
            this.kvtRequestEventProcessors.stopAsync();
        }
        if (this.commitEventProcessors != null) {
            log.info("Awaiting termination of commit event processors");
            this.commitEventProcessors.awaitTerminated();
        }
        if (this.abortEventProcessors != null) {
            log.info("Awaiting termination of abort event processors");
            this.abortEventProcessors.awaitTerminated();
        }
        if (this.requestEventProcessors != null) {
            log.info("Awaiting termination of request event processors");
            this.requestEventProcessors.awaitTerminated();
        }
        if (this.kvtRequestEventProcessors != null) {
            log.info("Awaiting termination of kvt request event processors");
            this.kvtRequestEventProcessors.awaitTerminated();
        }
    }

    @VisibleForTesting
    void setTruncationInterval(long j) {
        this.truncationInterval.set(j);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.clientFactory.close();
        try {
            stopAsync().awaitTerminated(this.config.getShutdownTimeout().toMillis(), TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            log.error("Timeout expired while waiting for service to shut down.", e);
        }
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public AtomicBoolean getBootstrapCompleted() {
        return this.bootstrapCompleted;
    }
}
