package io.pravega.controller.eventProcessor.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractIdleService;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.Position;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.ReaderGroup;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.Stream;
import io.pravega.common.LoggerHelpers;
import io.pravega.controller.eventProcessor.EventProcessorConfig;
import io.pravega.controller.eventProcessor.EventProcessorGroup;
import io.pravega.controller.store.checkpoint.CheckpointStore;
import io.pravega.controller.store.checkpoint.CheckpointStoreException;
import io.pravega.shared.controller.event.ControllerEvent;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/eventProcessor/impl/EventProcessorGroupImpl.class */
public final class EventProcessorGroupImpl<T extends ControllerEvent> extends AbstractIdleService implements EventProcessorGroup<T> {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(EventProcessorGroupImpl.class);
    private final String objectId;
    private final EventProcessorSystemImpl actorSystem;
    private final EventProcessorConfig<T> eventProcessorConfig;
    private final EventStreamWriter<T> writer;
    private ReaderGroup readerGroup;
    private final CheckpointStore checkpointStore;
    private final Object lock = new Object();

    @VisibleForTesting
    private final ConcurrentHashMap<String, EventProcessorCell<T>> eventProcessorMap = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventProcessorGroupImpl(EventProcessorSystemImpl eventProcessorSystemImpl, EventProcessorConfig<T> eventProcessorConfig, CheckpointStore checkpointStore) {
        this.objectId = String.format("EventProcessorGroup[%s]", eventProcessorConfig.getConfig().getReaderGroupName());
        this.actorSystem = eventProcessorSystemImpl;
        this.eventProcessorConfig = eventProcessorConfig;
        this.writer = eventProcessorSystemImpl.clientFactory.createEventWriter(eventProcessorConfig.getConfig().getStreamName(), eventProcessorConfig.getSerializer(), EventWriterConfig.builder().build());
        this.checkpointStore = checkpointStore;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initialize() throws CheckpointStoreException {
        try {
            this.checkpointStore.addReaderGroup(this.actorSystem.getProcess(), this.eventProcessorConfig.getConfig().getReaderGroupName());
        } catch (CheckpointStoreException e) {
            if (!e.getType().equals(CheckpointStoreException.Type.NodeExists)) {
                throw e;
            }
            log.warn("reader group {} exists", this.eventProcessorConfig.getConfig().getReaderGroupName());
        }
        this.readerGroup = createIfNotExists(this.actorSystem.readerGroupManager, this.eventProcessorConfig.getConfig().getReaderGroupName(), ReaderGroupConfig.builder().disableAutomaticCheckpoints().stream(Stream.of(this.actorSystem.getScope(), this.eventProcessorConfig.getConfig().getStreamName())).build());
        createEventProcessors(this.eventProcessorConfig.getConfig().getEventProcessorCount() - this.eventProcessorMap.values().size());
    }

    private ReaderGroup createIfNotExists(ReaderGroupManager readerGroupManager, String str, ReaderGroupConfig readerGroupConfig) {
        readerGroupManager.createReaderGroup(str, readerGroupConfig);
        return readerGroupManager.getReaderGroup(str);
    }

    private List<String> createEventProcessors(int i) throws CheckpointStoreException {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            String uuid = UUID.randomUUID().toString();
            this.checkpointStore.addReader(this.actorSystem.getProcess(), this.eventProcessorConfig.getConfig().getReaderGroupName(), uuid);
            EventProcessorCell<T> eventProcessorCell = new EventProcessorCell<>(this.eventProcessorConfig, this.actorSystem.clientFactory.createReader(uuid, this.eventProcessorConfig.getConfig().getReaderGroupName(), this.eventProcessorConfig.getSerializer(), ReaderConfig.builder().build()), this.writer, this.actorSystem.getProcess(), uuid, i2, this.checkpointStore);
            log.info("Created event processor {}, id={}", Integer.valueOf(i2), eventProcessorCell.toString());
            this.eventProcessorMap.put(uuid, eventProcessorCell);
            arrayList.add(uuid);
        }
        return arrayList;
    }

    protected void startUp() {
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.objectId, "startUp", new Object[0]);
        try {
            log.info("Attempting to start all event processors in {}", toString());
            this.eventProcessorMap.entrySet().forEach(entry -> {
                ((EventProcessorCell) entry.getValue()).startAsync();
            });
            log.info("Waiting for all all event processors in {} to start", toString());
            this.eventProcessorMap.entrySet().forEach(entry2 -> {
                ((EventProcessorCell) entry2.getValue()).awaitStartupComplete();
            });
            LoggerHelpers.traceLeave(log, this.objectId, "startUp", traceEnterWithContext, new Object[0]);
        } catch (Throwable th) {
            LoggerHelpers.traceLeave(log, this.objectId, "startUp", traceEnterWithContext, new Object[0]);
            throw th;
        }
    }

    protected void shutDown() {
        synchronized (this.lock) {
            long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.objectId, "shutDown", new Object[0]);
            try {
                try {
                    log.info("Attempting to seal the reader group entry from checkpoint store");
                    this.checkpointStore.sealReaderGroup(this.actorSystem.getProcess(), this.readerGroup.getGroupName());
                } catch (CheckpointStoreException e) {
                    log.warn("Error sealing reader group " + this.objectId, e);
                }
                for (EventProcessorCell<T> eventProcessorCell : this.eventProcessorMap.values()) {
                    log.info("Stopping {}", eventProcessorCell);
                    eventProcessorCell.stopAsync();
                    log.info("Awaiting termination of {}", eventProcessorCell);
                    try {
                        eventProcessorCell.awaitTerminated();
                    } catch (IllegalStateException e2) {
                        log.warn(String.format("Failed terminating %s", eventProcessorCell), e2.getMessage());
                    }
                }
                try {
                    log.info("Attempting to clean up reader group entry from checkpoint store");
                    this.checkpointStore.removeReaderGroup(this.actorSystem.getProcess(), this.readerGroup.getGroupName());
                } catch (CheckpointStoreException e3) {
                    log.warn("Error removing reader group " + this.objectId, e3);
                }
                this.readerGroup.close();
                log.info("Shutdown of {} complete", toString());
                LoggerHelpers.traceLeave(log, this.objectId, "shutDown", traceEnterWithContext, new Object[0]);
            } catch (Throwable th) {
                LoggerHelpers.traceLeave(log, this.objectId, "shutDown", traceEnterWithContext, new Object[0]);
                throw th;
            }
        }
    }

    @Override // io.pravega.controller.eventProcessor.EventProcessorGroup
    public void notifyProcessFailure(String str) throws CheckpointStoreException {
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.objectId, "notifyProcessFailure", new Object[]{str});
        log.info("Notifying failure of process {} participating in reader group {}", str, this.objectId);
        try {
            try {
                for (Map.Entry<String, Position> entry : this.checkpointStore.sealReaderGroup(str, this.readerGroup.getGroupName()).entrySet()) {
                    log.info("{} Notifying readerOffline reader={}, position={}", new Object[]{this.objectId, entry.getKey(), entry.getValue()});
                    this.readerGroup.readerOffline(entry.getKey(), entry.getValue());
                    log.info("{} removing reader={} from checkpoint store", this.objectId, entry.getKey());
                    this.checkpointStore.removeReader(str, this.readerGroup.getGroupName(), entry.getKey());
                }
                log.info("Removing reader group {} from process {}", this.readerGroup.getGroupName(), str);
                this.checkpointStore.removeReaderGroup(str, this.readerGroup.getGroupName());
                LoggerHelpers.traceLeave(log, "notifyProcessFailure", traceEnterWithContext, new Object[]{str});
            } catch (CheckpointStoreException e) {
                if (!e.getType().equals(CheckpointStoreException.Type.NoNode)) {
                    throw e;
                }
                LoggerHelpers.traceLeave(log, "notifyProcessFailure", traceEnterWithContext, new Object[]{str});
            }
        } catch (Throwable th) {
            LoggerHelpers.traceLeave(log, "notifyProcessFailure", traceEnterWithContext, new Object[]{str});
            throw th;
        }
    }

    public void changeEventProcessorCount(int i) throws CheckpointStoreException {
        synchronized (this.lock) {
            Preconditions.checkState(isRunning(), state().name());
            if (i <= 0) {
                throw new NotImplementedException("Decrease processor count");
            }
            createEventProcessors(i).stream().forEach(str -> {
                this.eventProcessorMap.get(str).startAsync();
            });
        }
    }

    @Override // io.pravega.controller.eventProcessor.EventProcessorGroup
    public EventStreamWriter<T> getWriter() {
        return this.writer;
    }

    @Override // io.pravega.controller.eventProcessor.EventProcessorGroup
    public Set<String> getProcesses() throws CheckpointStoreException {
        return this.checkpointStore.getProcesses();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        stopAsync();
    }

    public String toString() {
        return this.objectId;
    }

    @SuppressFBWarnings(justification = "generated code")
    ConcurrentHashMap<String, EventProcessorCell<T>> getEventProcessorMap() {
        return this.eventProcessorMap;
    }
}
