package io.pravega.controller.eventProcessor.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.Service;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.EventRead;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.Position;
import io.pravega.common.LoggerHelpers;
import io.pravega.controller.eventProcessor.CheckpointConfig;
import io.pravega.controller.eventProcessor.EventProcessorConfig;
import io.pravega.controller.eventProcessor.EventProcessorInitException;
import io.pravega.controller.eventProcessor.EventProcessorReinitException;
import io.pravega.controller.eventProcessor.ExceptionHandler;
import io.pravega.controller.server.rest.generated.api.ApiResponseMessage;
import io.pravega.controller.store.checkpoint.CheckpointStore;
import io.pravega.controller.store.checkpoint.CheckpointStoreException;
import io.pravega.shared.controller.event.ControllerEvent;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.NotThreadSafe;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/pravega/controller/eventProcessor/impl/EventProcessorCell.class */
public class EventProcessorCell<T extends ControllerEvent> {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(EventProcessorCell.class);
    private final EventStreamReader<T> reader;
    private final EventStreamWriter<T> selfWriter;
    private final CheckpointStore checkpointStore;
    private final String process;
    private final String readerGroupName;
    private final String readerId;
    private final String objectId;
    private final AtomicReference<Position> lastCheckpoint;

    @VisibleForTesting
    private EventProcessor<T> actor;
    private final Service delegate;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.pravega.controller.eventProcessor.impl.EventProcessorCell$1, reason: invalid class name */
    /* loaded from: input_file:io/pravega/controller/eventProcessor/impl/EventProcessorCell$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$pravega$controller$eventProcessor$ExceptionHandler$Directive = new int[ExceptionHandler.Directive.values().length];

        static {
            try {
                $SwitchMap$io$pravega$controller$eventProcessor$ExceptionHandler$Directive[ExceptionHandler.Directive.Restart.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$pravega$controller$eventProcessor$ExceptionHandler$Directive[ExceptionHandler.Directive.Resume.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$pravega$controller$eventProcessor$ExceptionHandler$Directive[ExceptionHandler.Directive.Stop.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @NotThreadSafe
    /* loaded from: input_file:io/pravega/controller/eventProcessor/impl/EventProcessorCell$CheckpointState.class */
    private class CheckpointState {
        private final boolean enableCheckpoint;
        private final CheckpointConfig.CheckpointPeriod checkpointPeriod;
        private int count;
        private int previousCheckpointIndex;
        private long previousCheckpointTimestamp;

        CheckpointState(CheckpointConfig checkpointConfig) {
            if (checkpointConfig.getType() == CheckpointConfig.Type.Periodic) {
                this.enableCheckpoint = true;
                this.checkpointPeriod = checkpointConfig.getCheckpointPeriod();
            } else {
                this.enableCheckpoint = false;
                this.checkpointPeriod = null;
            }
            this.count = 0;
            this.previousCheckpointIndex = 0;
            this.previousCheckpointTimestamp = System.currentTimeMillis();
        }

        void store(Position position) {
            if (this.enableCheckpoint) {
                this.count++;
                long currentTimeMillis = System.currentTimeMillis();
                int i = this.count - this.previousCheckpointIndex;
                long j = currentTimeMillis - this.previousCheckpointTimestamp;
                if (i >= this.checkpointPeriod.getNumEvents() || j >= 1000 * this.checkpointPeriod.getNumSeconds()) {
                    try {
                        EventProcessorCell.this.actor.getCheckpointer().store(position);
                        this.previousCheckpointIndex = this.count;
                        this.previousCheckpointTimestamp = currentTimeMillis;
                    } catch (CheckpointStoreException e) {
                        EventProcessorCell.log.warn(String.format("Failed persisting checkpoint for event processor %s", EventProcessorCell.this.objectId), e.getCause());
                    }
                }
            }
        }
    }

    /* loaded from: input_file:io/pravega/controller/eventProcessor/impl/EventProcessorCell$Delegate.class */
    private class Delegate extends AbstractExecutionThreadService {
        private final long defaultTimeout = 2000;
        private final EventProcessorConfig<T> eventProcessorConfig;
        private EventRead<T> event;
        private final EventProcessorCell<T>.CheckpointState state;

        Delegate(EventProcessorConfig<T> eventProcessorConfig) {
            this.eventProcessorConfig = eventProcessorConfig;
            this.state = new CheckpointState(eventProcessorConfig.getConfig().getCheckpointConfig());
        }

        protected final void startUp() {
            EventProcessorCell.log.info("Event processor STARTUP {}, state={}", EventProcessorCell.this.objectId, state());
            try {
                EventProcessorCell.this.actor.beforeStart();
            } catch (Exception e) {
                EventProcessorCell.log.warn(String.format("Failed while executing preStart for event processor %s", EventProcessorCell.this.objectId), e);
                handleException(new EventProcessorInitException(e));
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        protected final void run() throws Exception {
            EventProcessorCell.log.debug("Event processor RUN {}, state={}", EventProcessorCell.this.objectId, state());
            while (isRunning()) {
                try {
                    this.event = EventProcessorCell.this.reader.readNextEvent(2000L);
                    if (this.event != null && this.event.getEvent() != null) {
                        EventProcessorCell.this.actor.process((ControllerEvent) this.event.getEvent(), this.event.getPosition());
                        this.state.store(this.event.getPosition());
                    }
                } catch (Exception e) {
                    handleException(e);
                }
            }
        }

        protected final void shutDown() throws Exception {
            EventProcessorCell.log.info("Event processor SHUTDOWN {}, state={}", EventProcessorCell.this.objectId, state());
            try {
                try {
                    EventProcessorCell.this.actor.afterStop();
                    EventProcessorCell.log.info("Closing reader for {}", EventProcessorCell.this.objectId);
                    try {
                        EventProcessorCell.this.reader.closeAt(EventProcessorCell.this.getCheckpoint());
                    } catch (Exception e) {
                        EventProcessorCell.log.info("Exception while closing EventProcessorCell reader from checkpointStore: {}.", e.getMessage());
                    }
                } catch (Exception e2) {
                    EventProcessorCell.log.warn(String.format("Failed while executing afterStop for event processor %s", EventProcessorCell.this.objectId), e2);
                    throw e2;
                }
            } catch (Throwable th) {
                EventProcessorCell.log.info("Closing reader for {}", EventProcessorCell.this.objectId);
                try {
                    EventProcessorCell.this.reader.closeAt(EventProcessorCell.this.getCheckpoint());
                } catch (Exception e3) {
                    EventProcessorCell.log.info("Exception while closing EventProcessorCell reader from checkpointStore: {}.", e3.getMessage());
                }
                throw th;
            }
        }

        private void restart(Throwable th, T t) {
            EventProcessorCell.log.debug("Event processor RESTART {}, state={}", EventProcessorCell.this.objectId, state());
            try {
                EventProcessorCell.this.actor.beforeRestart(th, t);
                EventProcessorCell.this.actor = EventProcessorCell.this.createEventProcessor(this.eventProcessorConfig);
                startUp();
            } catch (Exception e) {
                EventProcessorCell.log.warn(String.format("Failed while executing preRestart for event processor %s", EventProcessorCell.this.objectId), e);
                handleException(new EventProcessorReinitException(e));
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        private void handleException(Exception exc) {
            switch (AnonymousClass1.$SwitchMap$io$pravega$controller$eventProcessor$ExceptionHandler$Directive[this.eventProcessorConfig.getExceptionHandler().run(exc).ordinal()]) {
                case ApiResponseMessage.ERROR /* 1 */:
                    EventProcessorCell.log.warn("Restarting event processor: {} due to exception: {}", EventProcessorCell.this.objectId, exc);
                    restart(exc, this.event == null ? null : (ControllerEvent) this.event.getEvent());
                    return;
                case ApiResponseMessage.WARNING /* 2 */:
                    EventProcessorCell.log.debug("Resuming event processor: {} after receiving exception: {}", EventProcessorCell.this.objectId, exc);
                    return;
                case ApiResponseMessage.INFO /* 3 */:
                    EventProcessorCell.log.warn("Stopping event processor: {} due to exception: {}", EventProcessorCell.this.objectId, exc);
                    stopAsync();
                    return;
                default:
                    return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventProcessorCell(EventProcessorConfig<T> eventProcessorConfig, EventStreamReader<T> eventStreamReader, EventStreamWriter<T> eventStreamWriter, String str, String str2, int i, CheckpointStore checkpointStore) {
        Preconditions.checkNotNull(eventProcessorConfig);
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str));
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str2));
        this.reader = (EventStreamReader) Preconditions.checkNotNull(eventStreamReader);
        this.selfWriter = (EventStreamWriter) Preconditions.checkNotNull(eventStreamWriter);
        this.checkpointStore = (CheckpointStore) Preconditions.checkNotNull(checkpointStore);
        this.process = str;
        this.readerGroupName = eventProcessorConfig.getConfig().getReaderGroupName();
        this.readerId = str2;
        this.objectId = String.format("EventProcessor[%s:%d:%s]", this.readerGroupName, Integer.valueOf(i), str2);
        this.actor = createEventProcessor(eventProcessorConfig);
        this.delegate = new Delegate(eventProcessorConfig);
        this.lastCheckpoint = new AtomicReference<>();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void startAsync() {
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.objectId, "startAsync", new Object[0]);
        try {
            this.delegate.startAsync();
            LoggerHelpers.traceLeave(log, this.objectId, "startAsync", traceEnterWithContext, new Object[0]);
        } catch (Throwable th) {
            LoggerHelpers.traceLeave(log, this.objectId, "startAsync", traceEnterWithContext, new Object[0]);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void stopAsync() {
        long traceEnterWithContext = LoggerHelpers.traceEnterWithContext(log, this.objectId, "stopAsync", new Object[0]);
        try {
            this.delegate.stopAsync();
            log.info("Event processor cell {} SHUTDOWN issued", this.objectId);
            LoggerHelpers.traceLeave(log, this.objectId, "stopAsync", traceEnterWithContext, new Object[0]);
        } catch (Throwable th) {
            LoggerHelpers.traceLeave(log, this.objectId, "stopAsync", traceEnterWithContext, new Object[0]);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void awaitStartupComplete() {
        try {
            this.delegate.awaitRunning();
        } catch (IllegalStateException e) {
            Service.State state = this.delegate.state();
            if (state != Service.State.STOPPING && state != Service.State.TERMINATED) {
                throw e;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void awaitTerminated() {
        this.delegate.awaitTerminated();
        log.info("Event processor cell {} Terminated", this.objectId);
    }

    private EventProcessor<T> createEventProcessor(EventProcessorConfig<T> eventProcessorConfig) {
        EventProcessor<T> eventProcessor = eventProcessorConfig.getSupplier().get();
        eventProcessor.checkpointer = position -> {
            this.checkpointStore.setPosition(this.process, this.readerGroupName, this.readerId, position);
            this.lastCheckpoint.set(position);
        };
        EventStreamWriter<T> eventStreamWriter = this.selfWriter;
        Objects.requireNonNull(eventStreamWriter);
        eventProcessor.selfWriter = (v1) -> {
            return r1.writeEvent(v1);
        };
        return eventProcessor;
    }

    Position getCheckpoint() {
        return this.lastCheckpoint.get();
    }

    public String toString() {
        return String.format("%s[%s]", this.objectId, this.delegate.state());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public String getProcess() {
        return this.process;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    String getReaderId() {
        return this.readerId;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    EventProcessor<T> getActor() {
        return this.actor;
    }
}
