package io.pravega.controller.eventProcessor.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.AbstractIdleService;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.Position;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.ReaderGroup;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.ReaderSegmentDistribution;
import io.pravega.client.stream.Stream;
import io.pravega.common.Exceptions;
import io.pravega.common.LoggerHelpers;
import io.pravega.controller.eventProcessor.EventProcessorConfig;
import io.pravega.controller.eventProcessor.EventProcessorGroup;
import io.pravega.controller.store.checkpoint.CheckpointStore;
import io.pravega.controller.store.checkpoint.CheckpointStoreException;
import io.pravega.shared.controller.event.ControllerEvent;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.commons.lang3.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/eventProcessor/impl/EventProcessorGroupImpl.class */
public final class EventProcessorGroupImpl<T extends ControllerEvent> extends AbstractIdleService implements EventProcessorGroup<T> {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log;
    private final String objectId;
    private final EventProcessorSystemImpl actorSystem;
    private final EventProcessorConfig<T> eventProcessorConfig;

    @VisibleForTesting
    private final ConcurrentHashMap<String, EventProcessorCell<T>> eventProcessorMap;
    private final EventStreamWriter<T> writer;
    private ReaderGroup readerGroup;
    private final CheckpointStore checkpointStore;
    private final ScheduledExecutorService rebalanceExecutor;
    private ScheduledFuture<?> rebalanceFuture;
    private final long rebalancePeriodMillis;
    private final Object lock;
    static final /* synthetic */ boolean $assertionsDisabled;

    EventProcessorGroupImpl(EventProcessorSystemImpl eventProcessorSystemImpl, EventProcessorConfig<T> eventProcessorConfig, CheckpointStore checkpointStore) {
        this(eventProcessorSystemImpl, eventProcessorConfig, checkpointStore, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventProcessorGroupImpl(EventProcessorSystemImpl eventProcessorSystemImpl, EventProcessorConfig<T> eventProcessorConfig, CheckpointStore checkpointStore, ScheduledExecutorService scheduledExecutorService) {
        this.lock = new Object();
        this.objectId = String.format("EventProcessorGroup[%s]", eventProcessorConfig.getConfig().getReaderGroupName());
        this.actorSystem = eventProcessorSystemImpl;
        this.eventProcessorConfig = eventProcessorConfig;
        this.rebalanceExecutor = scheduledExecutorService;
        this.eventProcessorMap = new ConcurrentHashMap<>();
        this.writer = eventProcessorSystemImpl.clientFactory.createEventWriter(eventProcessorConfig.getConfig().getStreamName(), eventProcessorConfig.getSerializer(), EventWriterConfig.builder().enableConnectionPooling(true).retryAttempts(Integer.MAX_VALUE).build());
        this.checkpointStore = checkpointStore;
        this.rebalancePeriodMillis = eventProcessorConfig.getRebalancePeriodMillis();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initialize() throws CheckpointStoreException {
        try {
            this.checkpointStore.addReaderGroup(this.actorSystem.getProcess(), this.eventProcessorConfig.getConfig().getReaderGroupName());
        } catch (CheckpointStoreException e) {
            if (!e.getType().equals(CheckpointStoreException.Type.NodeExists)) {
                throw e;
            }
            log.debug("reader group {} exists", this.eventProcessorConfig.getConfig().getReaderGroupName());
        }
        this.readerGroup = createIfNotExists(this.actorSystem.readerGroupManager, this.eventProcessorConfig.getConfig().getReaderGroupName(), ReaderGroupConfig.builder().disableAutomaticCheckpoints().stream(Stream.of(this.actorSystem.getScope(), this.eventProcessorConfig.getConfig().getStreamName())).build());
        createEventProcessors(this.eventProcessorConfig.getConfig().getEventProcessorCount() - this.eventProcessorMap.values().size());
    }

    private ReaderGroup createIfNotExists(ReaderGroupManager readerGroupManager, String str, ReaderGroupConfig readerGroupConfig) {
        readerGroupManager.createReaderGroup(str, readerGroupConfig);
        return readerGroupManager.getReaderGroup(str);
    }

    private List<String> createEventProcessors(int i) throws CheckpointStoreException {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            String uuid = UUID.randomUUID().toString();
            this.checkpointStore.addReader(this.actorSystem.getProcess(), this.eventProcessorConfig.getConfig().getReaderGroupName(), uuid);
            EventProcessorCell<T> eventProcessorCell = new EventProcessorCell<>(this.eventProcessorConfig, this.actorSystem.clientFactory.createReader(uuid, this.eventProcessorConfig.getConfig().getReaderGroupName(), this.eventProcessorConfig.getSerializer(), ReaderConfig.builder().disableTimeWindows(true).build()), this.writer, this.actorSystem.getProcess(), uuid, i2, this.checkpointStore);
            log.info("Created event processor {}, id={}", Integer.valueOf(i2), eventProcessorCell.toString());
            this.eventProcessorMap.put(uuid, eventProcessorCell);
            arrayList.add(uuid);
            try {
                this.checkpointStore.addReader(this.actorSystem.getProcess(), this.eventProcessorConfig.getConfig().getReaderGroupName(), uuid);
            } catch (CheckpointStoreException e) {
                if (!e.getType().equals(CheckpointStoreException.Type.NodeExists)) {
                    throw e;
                }
            }
        }
        return arrayList;
    }

    protected void startUp() {
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.objectId, "startUp", new Object[0]);
        try {
            log.info("Attempting to start all event processors in {}", toString());
            this.eventProcessorMap.entrySet().forEach(entry -> {
                ((EventProcessorCell) entry.getValue()).startAsync();
            });
            this.eventProcessorMap.entrySet().forEach(entry2 -> {
                ((EventProcessorCell) entry2.getValue()).awaitStartupComplete();
            });
            log.info("All event processors in {} started successfully.", toString());
            if (this.rebalancePeriodMillis <= 0 || this.rebalanceExecutor == null) {
                this.rebalanceFuture = null;
            } else {
                this.rebalanceFuture = this.rebalanceExecutor.scheduleWithFixedDelay(this::rebalance, this.rebalancePeriodMillis, this.rebalancePeriodMillis, TimeUnit.MILLISECONDS);
            }
            LoggerHelpers.traceLeave(log, this.objectId, "startUp", traceEnterWithContext, new Object[0]);
        } catch (Throwable th) {
            LoggerHelpers.traceLeave(log, this.objectId, "startUp", traceEnterWithContext, new Object[0]);
            throw th;
        }
    }

    protected void shutDown() {
        log.info("Shutting down all event processors in {}", toString());
        synchronized (this.lock) {
            long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.objectId, "shutDown", new Object[0]);
            try {
                cleanUpCheckpointStore();
                this.readerGroup.close();
                if (this.rebalanceFuture != null) {
                    this.rebalanceFuture.cancel(true);
                }
                this.writer.close();
                LoggerHelpers.traceLeave(log, this.objectId, "shutDown", traceEnterWithContext, new Object[0]);
            } catch (Throwable th) {
                LoggerHelpers.traceLeave(log, this.objectId, "shutDown", traceEnterWithContext, new Object[0]);
                throw th;
            }
        }
        log.info("Shut down all event processors in {} complete.", toString());
    }

    private void cleanUpCheckpointStore() {
        for (EventProcessorCell<T> eventProcessorCell : this.eventProcessorMap.values()) {
            log.info("Terminating event processor cell: {}", eventProcessorCell);
            eventProcessorCell.stopAsync();
        }
        for (EventProcessorCell<T> eventProcessorCell2 : this.eventProcessorMap.values()) {
            try {
                eventProcessorCell2.awaitTerminated();
                log.debug("Termination of event processor cell: {} completed successfully.", eventProcessorCell2);
            } catch (Exception e) {
                log.warn("Failed terminating event processor cell {}.", eventProcessorCell2, e);
            }
        }
        try {
            log.debug("Attempting to clean up reader group {} entry from checkpoint store", this.objectId);
            this.checkpointStore.removeProcessFromGroup(this.actorSystem.getProcess(), this.readerGroup.getGroupName());
        } catch (CheckpointStoreException e2) {
            log.warn("Error removing reader group " + this.objectId, e2);
        }
    }

    @Override // io.pravega.controller.eventProcessor.EventProcessorGroup
    public void notifyProcessFailure(String str) throws CheckpointStoreException {
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.objectId, "notifyProcessFailure", new Object[]{str});
        log.info("Notifying failure of process {} participating in reader group {}", str, this.objectId);
        try {
            try {
                log.debug("Removing reader group {} from process {}", this.readerGroup.getGroupName(), str);
                for (Map.Entry<String, Position> entry : this.checkpointStore.removeProcessFromGroup(str, this.readerGroup.getGroupName()).entrySet()) {
                    log.info("{} Notifying readerOffline reader={}, position={}", new Object[]{this.objectId, entry.getKey(), entry.getValue()});
                    this.readerGroup.readerOffline(entry.getKey(), entry.getValue());
                }
                LoggerHelpers.traceLeave(log, "notifyProcessFailure", traceEnterWithContext, new Object[]{str});
            } catch (CheckpointStoreException e) {
                if (!e.getType().equals(CheckpointStoreException.Type.NoNode)) {
                    throw e;
                }
                LoggerHelpers.traceLeave(log, "notifyProcessFailure", traceEnterWithContext, new Object[]{str});
            }
        } catch (Throwable th) {
            LoggerHelpers.traceLeave(log, "notifyProcessFailure", traceEnterWithContext, new Object[]{str});
            throw th;
        }
    }

    void changeEventProcessorCount(int i) throws CheckpointStoreException {
        synchronized (this.lock) {
            Preconditions.checkState(isRunning(), state().name());
            if (i <= 0) {
                throw new NotImplementedException("Decrease processor count");
            }
            createEventProcessors(i).stream().forEach(str -> {
                this.eventProcessorMap.get(str).startAsync();
            });
        }
    }

    @Override // io.pravega.controller.eventProcessor.EventProcessorGroup
    public EventStreamWriter<T> getWriter() {
        return this.writer;
    }

    @Override // io.pravega.controller.eventProcessor.EventProcessorGroup
    public Set<String> getProcesses() throws CheckpointStoreException {
        return this.checkpointStore.getProcesses();
    }

    @VisibleForTesting
    void rebalance() {
        try {
            ReaderSegmentDistribution readerSegmentDistribution = this.readerGroup.getReaderSegmentDistribution();
            Map readerSegmentDistribution2 = readerSegmentDistribution.getReaderSegmentDistribution();
            int size = readerSegmentDistribution2.size();
            int intValue = ((Integer) readerSegmentDistribution2.values().stream().reduce(0, (v0, v1) -> {
                return Integer.sum(v0, v1);
            })).intValue() + readerSegmentDistribution.getUnassignedSegments();
            if (readerSegmentDistribution2.entrySet().stream().anyMatch(entry -> {
                return !Strings.isNullOrEmpty((String) entry.getKey()) && ((Integer) entry.getValue()).intValue() == 0;
            })) {
                readerSegmentDistribution2.forEach((str, num) -> {
                    if (!Strings.isNullOrEmpty(str) && this.eventProcessorMap.containsKey(str) && isRebalanceCandidate(num.intValue(), size, intValue)) {
                        replaceCell(str);
                    }
                });
            }
        } catch (Exception e) {
            log.warn("Re-balance failed with exception {} {}", Exceptions.unwrap(e).getClass().getSimpleName(), e.getMessage());
        }
    }

    private boolean isRebalanceCandidate(int i, int i2, int i3) {
        return ((double) i) >= (((double) i3) / ((double) i2)) + 1.0d;
    }

    private void replaceCell(String str) {
        synchronized (this.lock) {
            Preconditions.checkState(isRunning(), state().name());
            log.info("Found overloaded reader: {}", str);
            try {
                List<String> createEventProcessors = createEventProcessors(1);
                if (!$assertionsDisabled && createEventProcessors.size() != 1) {
                    throw new AssertionError();
                }
                this.eventProcessorMap.get(createEventProcessors.get(0)).startAsync();
                EventProcessorCell<T> eventProcessorCell = this.eventProcessorMap.get(str);
                log.info("Stopping event processor cell: {}", eventProcessorCell);
                try {
                    eventProcessorCell.stopAsync();
                    eventProcessorCell.awaitTerminated();
                    this.checkpointStore.removeReader(eventProcessorCell.getProcess(), this.readerGroup.getGroupName(), str);
                    this.eventProcessorMap.remove(str);
                    log.info("Termination of event processor cell: {} completed successfully.", eventProcessorCell);
                } catch (Exception e) {
                    log.warn("Failed terminating event processor cell {}.", eventProcessorCell, e);
                }
            } catch (CheckpointStoreException e2) {
                log.warn("Unable to create a new event processor cell", e2.getMessage());
            }
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        stopAsync();
        this.writer.close();
    }

    public String toString() {
        return this.objectId;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    ConcurrentHashMap<String, EventProcessorCell<T>> getEventProcessorMap() {
        return this.eventProcessorMap;
    }

    static {
        $assertionsDisabled = !EventProcessorGroupImpl.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(EventProcessorGroupImpl.class);
    }
}
