package io.openmessaging.storage.dledger.statemachine;

import io.openmessaging.storage.dledger.DLedgerEntryPusher;
import io.openmessaging.storage.dledger.DLedgerServer;
import io.openmessaging.storage.dledger.MemberState;
import io.openmessaging.storage.dledger.common.ShutdownAbleThread;
import io.openmessaging.storage.dledger.exception.DLedgerException;
import io.openmessaging.storage.dledger.metrics.DLedgerMetricsManager;
import io.openmessaging.storage.dledger.snapshot.SnapshotManager;
import io.openmessaging.storage.dledger.snapshot.SnapshotMeta;
import io.openmessaging.storage.dledger.snapshot.SnapshotReader;
import io.openmessaging.storage.dledger.snapshot.SnapshotStatus;
import io.openmessaging.storage.dledger.snapshot.SnapshotWriter;
import io.openmessaging.storage.dledger.snapshot.hook.LoadSnapshotHook;
import io.openmessaging.storage.dledger.snapshot.hook.SaveSnapshotHook;
import io.openmessaging.storage.dledger.snapshot.hook.SnapshotHook;
import io.openmessaging.storage.dledger.store.DLedgerStore;
import io.opentelemetry.api.common.Attributes;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/openmessaging/storage/dledger/statemachine/StateMachineCaller.class */
public class StateMachineCaller extends ShutdownAbleThread {
    private static final long RETRY_ON_COMMITTED_DELAY = 1000;
    private static Logger logger = LoggerFactory.getLogger(StateMachineCaller.class);
    private final DLedgerStore dLedgerStore;
    private final StateMachine statemachine;
    private final DLedgerEntryPusher entryPusher;
    private final MemberState memberState;
    private final BlockingQueue<ApplyTask> taskQueue;
    private final ScheduledExecutorService scheduledExecutorService;
    private final Function<ApplyEntry, Boolean> completeEntryCallback;
    private volatile DLedgerException error;
    private Optional<SnapshotManager> snapshotManager;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/openmessaging/storage/dledger/statemachine/StateMachineCaller$ApplyTask.class */
    public static class ApplyTask {
        TaskType type;
        long committedIndex;
        long term;
        SnapshotHook snapshotHook;

        private ApplyTask() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/openmessaging/storage/dledger/statemachine/StateMachineCaller$TaskType.class */
    public enum TaskType {
        COMMITTED,
        SNAPSHOT_SAVE,
        SNAPSHOT_LOAD,
        SHUTDOWN
    }

    public StateMachineCaller(DLedgerStore dLedgerStore, StateMachine stateMachine, DLedgerEntryPusher dLedgerEntryPusher) {
        super("StateMachineCaller-" + dLedgerStore.getMemberState().getSelfId(), logger);
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { // from class: io.openmessaging.storage.dledger.statemachine.StateMachineCaller.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable, "RetryOnCommittedScheduledThread");
            }
        });
        this.dLedgerStore = dLedgerStore;
        this.statemachine = stateMachine;
        this.entryPusher = dLedgerEntryPusher;
        this.memberState = dLedgerStore.getMemberState();
        this.taskQueue = new LinkedBlockingQueue(1024);
        if (dLedgerEntryPusher != null) {
            dLedgerEntryPusher.getClass();
            this.completeEntryCallback = dLedgerEntryPusher::completeResponseFuture;
            dLedgerEntryPusher.registerStateMachine(this);
        } else {
            this.completeEntryCallback = applyEntry -> {
                return true;
            };
        }
        this.snapshotManager = Optional.empty();
    }

    private boolean enqueueTask(ApplyTask applyTask) {
        return this.taskQueue.offer(applyTask);
    }

    public StateMachine getStateMachine() {
        return this.statemachine;
    }

    public boolean onCommitted(long j) {
        if (j <= this.memberState.getAppliedIndex()) {
            return false;
        }
        ApplyTask applyTask = new ApplyTask();
        applyTask.type = TaskType.COMMITTED;
        applyTask.committedIndex = j;
        return enqueueTask(applyTask);
    }

    public boolean onSnapshotLoad(LoadSnapshotHook loadSnapshotHook) {
        ApplyTask applyTask = new ApplyTask();
        applyTask.type = TaskType.SNAPSHOT_LOAD;
        applyTask.snapshotHook = loadSnapshotHook;
        return enqueueTask(applyTask);
    }

    public boolean onSnapshotSave(SaveSnapshotHook saveSnapshotHook) {
        ApplyTask applyTask = new ApplyTask();
        applyTask.type = TaskType.SNAPSHOT_SAVE;
        applyTask.snapshotHook = saveSnapshotHook;
        return enqueueTask(applyTask);
    }

    @Override // io.openmessaging.storage.dledger.common.ShutdownAbleThread
    public void shutdown() {
        super.shutdown();
        this.statemachine.onShutdown();
    }

    @Override // io.openmessaging.storage.dledger.common.ShutdownAbleThread
    public void doWork() {
        try {
            ApplyTask poll = this.taskQueue.poll(5L, TimeUnit.SECONDS);
            if (poll != null) {
                switch (poll.type) {
                    case COMMITTED:
                        doCommitted(poll.committedIndex);
                        break;
                    case SNAPSHOT_SAVE:
                        doSnapshotSave((SaveSnapshotHook) poll.snapshotHook);
                        break;
                    case SNAPSHOT_LOAD:
                        doSnapshotLoad((LoadSnapshotHook) poll.snapshotHook);
                        break;
                }
            }
        } catch (InterruptedException e) {
            logger.error("Error happen in stateMachineCaller when pull task from task queue", e);
        } catch (Throwable th) {
            logger.error("Apply task exception", th);
        }
    }

    private void doCommitted(long j) {
        if (this.error != null) {
            return;
        }
        long appliedIndex = this.memberState.getAppliedIndex();
        if (appliedIndex >= j) {
            return;
        }
        if (this.snapshotManager.isPresent() && (this.snapshotManager.get().isLoadingSnapshot() || this.snapshotManager.get().isSavingSnapshot())) {
            this.scheduledExecutorService.schedule(() -> {
                try {
                    onCommitted(j);
                    logger.info("Still loading or saving snapshot, retry the commit task with index: {} later", Long.valueOf(j));
                } catch (Throwable th) {
                    th.printStackTrace();
                }
            }, RETRY_ON_COMMITTED_DELAY, TimeUnit.MILLISECONDS);
            return;
        }
        ApplyEntryIterator applyEntryIterator = new ApplyEntryIterator(this.dLedgerStore, j, appliedIndex, this.completeEntryCallback);
        StopWatch createStarted = StopWatch.createStarted();
        this.statemachine.onApply(applyEntryIterator);
        Attributes build = DLedgerMetricsManager.newAttributesBuilder().build();
        DLedgerMetricsManager.applyTaskLatency.record(createStarted.getTime(TimeUnit.MICROSECONDS), build);
        long index = applyEntryIterator.getIndex();
        DLedgerMetricsManager.applyTaskBatchCount.record(index - appliedIndex, build);
        this.memberState.updateAppliedIndexAndTerm(index, this.dLedgerStore.get(Long.valueOf(index)).getTerm());
        this.snapshotManager.ifPresent(snapshotManager -> {
            snapshotManager.saveSnapshot();
        });
        if (applyEntryIterator.getCompleteAckNums() != 0 || this.entryPusher == null) {
            return;
        }
        this.entryPusher.checkResponseFuturesTimeout(this.memberState.getAppliedIndex() + 1);
    }

    private void doSnapshotLoad(LoadSnapshotHook loadSnapshotHook) {
        SnapshotReader snapshotReader = loadSnapshotHook.getSnapshotReader();
        try {
            SnapshotMeta load = snapshotReader.load();
            if (load == null) {
                logger.error("Unable to load state machine meta");
                loadSnapshotHook.doCallBack(SnapshotStatus.FAIL);
                return;
            }
            long lastIncludedIndex = load.getLastIncludedIndex();
            long lastIncludedTerm = load.getLastIncludedTerm();
            if (lastIncludedIndex <= this.memberState.getAppliedIndex()) {
                logger.warn("The snapshot loading is expired");
                loadSnapshotHook.doCallBack(SnapshotStatus.EXPIRED);
                return;
            }
            StopWatch createStarted = StopWatch.createStarted();
            try {
                if (!this.statemachine.onSnapshotLoad(snapshotReader)) {
                    logger.error("Unable to load data from snapshot into state machine");
                    loadSnapshotHook.doCallBack(SnapshotStatus.FAIL);
                    return;
                }
                DLedgerMetricsManager.loadSnapshotLatency.record(createStarted.getTime(TimeUnit.MICROSECONDS), DLedgerMetricsManager.newAttributesBuilder().build());
                this.memberState.updateAppliedIndexAndTerm(lastIncludedIndex, lastIncludedTerm);
                this.memberState.leaderUpdateCommittedIndex(lastIncludedTerm, lastIncludedIndex);
                loadSnapshotHook.registerSnapshotMeta(load);
                loadSnapshotHook.doCallBack(SnapshotStatus.SUCCESS);
            } catch (Exception e) {
                e.printStackTrace();
                loadSnapshotHook.doCallBack(SnapshotStatus.FAIL);
            }
        } catch (IOException e2) {
            logger.error(e2.getMessage());
            loadSnapshotHook.doCallBack(SnapshotStatus.FAIL);
        }
    }

    private void doSnapshotSave(SaveSnapshotHook saveSnapshotHook) {
        saveSnapshotHook.registerSnapshotMeta(new SnapshotMeta(this.memberState.getAppliedIndex(), this.memberState.getAppliedTerm()));
        SnapshotWriter snapshotWriter = saveSnapshotHook.getSnapshotWriter();
        if (snapshotWriter == null) {
            return;
        }
        StopWatch createStarted = StopWatch.createStarted();
        try {
            if (this.statemachine.onSnapshotSave(snapshotWriter)) {
                DLedgerMetricsManager.saveSnapshotLatency.record(createStarted.getTime(TimeUnit.MICROSECONDS), DLedgerMetricsManager.newAttributesBuilder().build());
                saveSnapshotHook.doCallBack(SnapshotStatus.SUCCESS);
            } else {
                logger.error("Unable to save snapshot data from state machine");
                saveSnapshotHook.doCallBack(SnapshotStatus.FAIL);
            }
        } catch (Exception e) {
            e.printStackTrace();
            saveSnapshotHook.doCallBack(SnapshotStatus.FAIL);
        }
    }

    public void setError(DLedgerServer dLedgerServer, DLedgerException dLedgerException) {
        this.error = dLedgerException;
        if (this.statemachine != null) {
            this.statemachine.onError(dLedgerException);
        }
        if (dLedgerServer != null) {
            dLedgerServer.shutdown();
        }
    }

    public void registerSnapshotManager(SnapshotManager snapshotManager) {
        this.snapshotManager = Optional.of(snapshotManager);
    }

    public SnapshotManager getSnapshotManager() {
        return this.snapshotManager.orElse(null);
    }

    public DLedgerStore getdLedgerStore() {
        return this.dLedgerStore;
    }
}
