package io.openmessaging.storage.dledger;

import com.alibaba.fastjson.JSON;
import io.openmessaging.storage.dledger.common.Closure;
import io.openmessaging.storage.dledger.common.ShutdownAbleThread;
import io.openmessaging.storage.dledger.common.Status;
import io.openmessaging.storage.dledger.common.TimeoutFuture;
import io.openmessaging.storage.dledger.common.WriteClosure;
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dledger.exception.DLedgerException;
import io.openmessaging.storage.dledger.metrics.DLedgerMetricsConstant;
import io.openmessaging.storage.dledger.metrics.DLedgerMetricsManager;
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dledger.protocol.InstallSnapshotRequest;
import io.openmessaging.storage.dledger.protocol.InstallSnapshotResponse;
import io.openmessaging.storage.dledger.protocol.PushEntryRequest;
import io.openmessaging.storage.dledger.protocol.PushEntryResponse;
import io.openmessaging.storage.dledger.snapshot.DownloadSnapshot;
import io.openmessaging.storage.dledger.snapshot.SnapshotMeta;
import io.openmessaging.storage.dledger.snapshot.SnapshotReader;
import io.openmessaging.storage.dledger.statemachine.ApplyEntry;
import io.openmessaging.storage.dledger.statemachine.StateMachineCaller;
import io.openmessaging.storage.dledger.store.DLedgerStore;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import io.openmessaging.storage.dledger.utils.Pair;
import io.openmessaging.storage.dledger.utils.PreConditions;
import io.openmessaging.storage.dledger.utils.Quota;
import io.opentelemetry.api.common.Attributes;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/openmessaging/storage/dledger/DLedgerEntryPusher.class */
public class DLedgerEntryPusher {
    private static final Logger LOGGER = LoggerFactory.getLogger(DLedgerEntryPusher.class);
    private final DLedgerConfig dLedgerConfig;
    private final DLedgerStore dLedgerStore;
    private final MemberState memberState;
    private final DLedgerRpcService dLedgerRpcService;
    private final EntryHandler entryHandler;
    private final QuorumAckChecker quorumAckChecker;
    private final String selfId;
    private StateMachineCaller fsmCaller;
    private final Map<Long, ConcurrentMap<String, Long>> peerWaterMarksByTerm = new ConcurrentHashMap();
    private final Map<Long, ConcurrentMap<Long, Closure>> pendingClosure = new ConcurrentHashMap();
    private final Map<String, EntryDispatcher> dispatcherMap = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/openmessaging/storage/dledger/DLedgerEntryPusher$EntryDispatcher.class */
    public class EntryDispatcher extends ShutdownAbleThread {
        private final AtomicReference<EntryDispatcherState> type;
        private long lastPushCommitTimeMs;
        private final String peerId;
        private long writeIndex;
        private long matchIndex;
        private final int maxPendingSize = 1000;
        private long term;
        private String leaderId;
        private long lastCheckLeakTimeMs;
        private final ConcurrentMap<Long, Pair<Long, Integer>> pendingMap;
        private final PushEntryRequest batchAppendEntryRequest;
        private long lastAppendEntryRequestSendTimeMs;
        private final Quota quota;

        public EntryDispatcher(String str, Logger logger) {
            super("EntryDispatcher-" + DLedgerEntryPusher.this.memberState.getSelfId() + "-" + str, logger);
            this.type = new AtomicReference<>(EntryDispatcherState.COMPARE);
            this.lastPushCommitTimeMs = -1L;
            this.writeIndex = DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex() + 1;
            this.matchIndex = -1L;
            this.maxPendingSize = 1000;
            this.term = -1L;
            this.leaderId = null;
            this.lastCheckLeakTimeMs = System.currentTimeMillis();
            this.pendingMap = new ConcurrentHashMap();
            this.batchAppendEntryRequest = new PushEntryRequest();
            this.lastAppendEntryRequestSendTimeMs = -1L;
            this.quota = new Quota(DLedgerEntryPusher.this.dLedgerConfig.getPeerPushQuota());
            this.peerId = str;
        }

        @Override // java.lang.Thread
        public synchronized void start() {
            super.start();
            this.writeIndex = DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex() + 1;
        }

        private boolean checkNotLeaderAndFreshState() {
            if (!DLedgerEntryPusher.this.memberState.isLeader()) {
                return true;
            }
            if (this.term == DLedgerEntryPusher.this.memberState.currTerm() && this.leaderId != null && this.leaderId.equals(DLedgerEntryPusher.this.memberState.getLeaderId())) {
                return false;
            }
            synchronized (DLedgerEntryPusher.this.memberState) {
                if (!DLedgerEntryPusher.this.memberState.isLeader()) {
                    return true;
                }
                PreConditions.check(DLedgerEntryPusher.this.memberState.getSelfId().equals(DLedgerEntryPusher.this.memberState.getLeaderId()), DLedgerResponseCode.UNKNOWN);
                this.logger.info("[Push-{}->{}]Update term: {} and leaderId: {} to new term: {}, new leaderId: {}", new Object[]{DLedgerEntryPusher.this.selfId, this.peerId, Long.valueOf(this.term), this.leaderId, Long.valueOf(DLedgerEntryPusher.this.memberState.currTerm()), DLedgerEntryPusher.this.memberState.getLeaderId()});
                this.term = DLedgerEntryPusher.this.memberState.currTerm();
                this.leaderId = DLedgerEntryPusher.this.memberState.getSelfId();
                changeState(EntryDispatcherState.COMPARE);
                return false;
            }
        }

        private PushEntryRequest buildCompareOrTruncatePushRequest(long j, long j2, PushEntryRequest.Type type) {
            PushEntryRequest pushEntryRequest = new PushEntryRequest();
            pushEntryRequest.setGroup(DLedgerEntryPusher.this.memberState.getGroup());
            pushEntryRequest.setRemoteId(this.peerId);
            pushEntryRequest.setLeaderId(this.leaderId);
            pushEntryRequest.setLocalId(DLedgerEntryPusher.this.memberState.getSelfId());
            pushEntryRequest.setTerm(this.term);
            pushEntryRequest.setPreLogIndex(j2);
            pushEntryRequest.setPreLogTerm(j);
            pushEntryRequest.setType(type);
            pushEntryRequest.setCommitIndex(DLedgerEntryPusher.this.memberState.getCommittedIndex());
            return pushEntryRequest;
        }

        private PushEntryRequest buildCommitPushRequest() {
            PushEntryRequest pushEntryRequest = new PushEntryRequest();
            pushEntryRequest.setGroup(DLedgerEntryPusher.this.memberState.getGroup());
            pushEntryRequest.setRemoteId(this.peerId);
            pushEntryRequest.setLeaderId(this.leaderId);
            pushEntryRequest.setLocalId(DLedgerEntryPusher.this.memberState.getSelfId());
            pushEntryRequest.setTerm(this.term);
            pushEntryRequest.setType(PushEntryRequest.Type.COMMIT);
            pushEntryRequest.setCommitIndex(DLedgerEntryPusher.this.memberState.getCommittedIndex());
            return pushEntryRequest;
        }

        private InstallSnapshotRequest buildInstallSnapshotRequest(DownloadSnapshot downloadSnapshot) {
            InstallSnapshotRequest installSnapshotRequest = new InstallSnapshotRequest();
            installSnapshotRequest.setGroup(DLedgerEntryPusher.this.memberState.getGroup());
            installSnapshotRequest.setRemoteId(this.peerId);
            installSnapshotRequest.setLeaderId(this.leaderId);
            installSnapshotRequest.setLocalId(DLedgerEntryPusher.this.memberState.getSelfId());
            installSnapshotRequest.setTerm(this.term);
            installSnapshotRequest.setLastIncludedIndex(downloadSnapshot.getMeta().getLastIncludedIndex());
            installSnapshotRequest.setLastIncludedTerm(downloadSnapshot.getMeta().getLastIncludedTerm());
            installSnapshotRequest.setData(downloadSnapshot.getData());
            return installSnapshotRequest;
        }

        private void resetBatchAppendEntryRequest() {
            this.batchAppendEntryRequest.setGroup(DLedgerEntryPusher.this.memberState.getGroup());
            this.batchAppendEntryRequest.setRemoteId(this.peerId);
            this.batchAppendEntryRequest.setLeaderId(this.leaderId);
            this.batchAppendEntryRequest.setLocalId(DLedgerEntryPusher.this.selfId);
            this.batchAppendEntryRequest.setTerm(this.term);
            this.batchAppendEntryRequest.setType(PushEntryRequest.Type.APPEND);
            this.batchAppendEntryRequest.clear();
        }

        private void checkQuotaAndWait(DLedgerEntry dLedgerEntry) {
            if (DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex() - dLedgerEntry.getIndex() <= 1000) {
                return;
            }
            this.quota.sample(dLedgerEntry.getSize());
            if (this.quota.validateNow()) {
                long leftNow = this.quota.leftNow();
                this.logger.warn("[Push-{}]Quota exhaust, will sleep {}ms", this.peerId, Long.valueOf(leftNow));
                DLedgerUtils.sleep(leftNow);
            }
        }

        private DLedgerEntry getDLedgerEntryForAppend(long j) {
            try {
                DLedgerEntry dLedgerEntry = DLedgerEntryPusher.this.dLedgerStore.get(Long.valueOf(j));
                PreConditions.check(dLedgerEntry != null, DLedgerResponseCode.UNKNOWN, "writeIndex=%d", Long.valueOf(j));
                return dLedgerEntry;
            } catch (DLedgerException e) {
                if (!DLedgerResponseCode.INDEX_LESS_THAN_LOCAL_BEGIN.equals(e.getCode())) {
                    throw e;
                }
                this.logger.info("[Push-{}]Get INDEX_LESS_THAN_LOCAL_BEGIN when requested index is {}, try to compare", this.peerId, Long.valueOf(j));
                return null;
            }
        }

        private void doCommit() throws Exception {
            if (DLedgerUtils.elapsed(this.lastPushCommitTimeMs) > 1000) {
                DLedgerEntryPusher.this.dLedgerRpcService.push(buildCommitPushRequest());
                this.lastPushCommitTimeMs = System.currentTimeMillis();
            }
        }

        private void doCheckAppendResponse() throws Exception {
            long peerWaterMark = DLedgerEntryPusher.this.getPeerWaterMark(this.term, this.peerId);
            Pair<Long, Integer> pair = this.pendingMap.get(Long.valueOf(peerWaterMark + 1));
            if (pair != null && DLedgerUtils.elapsed(pair.getKey().longValue()) > DLedgerEntryPusher.this.dLedgerConfig.getMaxPushTimeOutMs()) {
                this.batchAppendEntryRequest.clear();
                this.writeIndex = peerWaterMark + 1;
                this.logger.warn("[Push-{}]Reset write index to {} for resending the entries which are timeout", this.peerId, Long.valueOf(peerWaterMark + 1));
            }
        }

        private synchronized void changeState(EntryDispatcherState entryDispatcherState) {
            this.logger.info("[Push-{}]Change state from {} to {}, matchIndex: {}, writeIndex: {}", new Object[]{this.peerId, this.type.get(), entryDispatcherState, Long.valueOf(this.matchIndex), Long.valueOf(this.writeIndex)});
            switch (entryDispatcherState) {
                case APPEND:
                    resetBatchAppendEntryRequest();
                    break;
                case COMPARE:
                    if (this.type.compareAndSet(EntryDispatcherState.APPEND, EntryDispatcherState.COMPARE)) {
                        this.writeIndex = DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex() + 1;
                        this.pendingMap.clear();
                        break;
                    }
                    break;
            }
            this.type.set(entryDispatcherState);
        }

        @Override // io.openmessaging.storage.dledger.common.ShutdownAbleThread
        public void doWork() {
            try {
                if (checkNotLeaderAndFreshState()) {
                    waitForRunning(1L);
                    return;
                }
                switch (this.type.get()) {
                    case APPEND:
                        doAppend();
                        break;
                    case COMPARE:
                        doCompare();
                        break;
                    case TRUNCATE:
                        doTruncate();
                        break;
                    case INSTALL_SNAPSHOT:
                        doInstallSnapshot();
                        break;
                    case COMMIT:
                        doCommit();
                        break;
                }
                waitForRunning(1L);
            } catch (Throwable th) {
                DLedgerEntryPusher.LOGGER.error("[Push-{}]Error in {} writeIndex={} matchIndex={}", new Object[]{this.peerId, getName(), Long.valueOf(this.writeIndex), Long.valueOf(this.matchIndex), th});
                changeState(EntryDispatcherState.COMPARE);
                DLedgerUtils.sleep(500L);
            }
        }

        private void doCompare() throws Exception {
            long term;
            PushEntryRequest buildCompareOrTruncatePushRequest;
            while (!checkNotLeaderAndFreshState() && this.type.get() == EntryDispatcherState.COMPARE && DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex() != -1) {
                long j = this.writeIndex - 1;
                if (j < DLedgerEntryPusher.this.dLedgerStore.getLedgerBeforeBeginIndex()) {
                    changeState(EntryDispatcherState.INSTALL_SNAPSHOT);
                    return;
                }
                if (j == DLedgerEntryPusher.this.dLedgerStore.getLedgerBeforeBeginIndex()) {
                    term = DLedgerEntryPusher.this.dLedgerStore.getLedgerBeforeBeginTerm();
                    buildCompareOrTruncatePushRequest = buildCompareOrTruncatePushRequest(term, j, PushEntryRequest.Type.COMPARE);
                } else {
                    DLedgerEntry dLedgerEntry = DLedgerEntryPusher.this.dLedgerStore.get(Long.valueOf(j));
                    PreConditions.check(dLedgerEntry != null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", Long.valueOf(j));
                    term = dLedgerEntry.getTerm();
                    buildCompareOrTruncatePushRequest = buildCompareOrTruncatePushRequest(term, dLedgerEntry.getIndex(), PushEntryRequest.Type.COMPARE);
                }
                PushEntryResponse pushEntryResponse = DLedgerEntryPusher.this.dLedgerRpcService.push(buildCompareOrTruncatePushRequest).get(3L, TimeUnit.SECONDS);
                PreConditions.check(pushEntryResponse != null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", Long.valueOf(j));
                PreConditions.check(pushEntryResponse.getCode() == DLedgerResponseCode.INCONSISTENT_STATE.getCode() || pushEntryResponse.getCode() == DLedgerResponseCode.SUCCESS.getCode(), DLedgerResponseCode.valueOf(pushEntryResponse.getCode()), "compareIndex=%d", Long.valueOf(j));
                if (pushEntryResponse.getCode() == DLedgerResponseCode.SUCCESS.getCode()) {
                    this.matchIndex = j;
                    DLedgerEntryPusher.this.updatePeerWaterMark(term, this.peerId, this.matchIndex);
                    changeState(EntryDispatcherState.TRUNCATE);
                    return;
                } else if (pushEntryResponse.getXTerm() != -1) {
                    this.writeIndex = pushEntryResponse.getXIndex();
                } else {
                    this.writeIndex = pushEntryResponse.getEndIndex() + 1;
                }
            }
        }

        private void doTruncate() throws Exception {
            PreConditions.check(this.type.get() == EntryDispatcherState.TRUNCATE, DLedgerResponseCode.UNKNOWN);
            long j = this.matchIndex + 1;
            this.logger.info("[Push-{}]Will push data to truncate truncateIndex={}", this.peerId, Long.valueOf(j));
            PushEntryResponse pushEntryResponse = DLedgerEntryPusher.this.dLedgerRpcService.push(buildCompareOrTruncatePushRequest(-1L, j, PushEntryRequest.Type.TRUNCATE)).get(3L, TimeUnit.SECONDS);
            PreConditions.check(pushEntryResponse != null, DLedgerResponseCode.UNKNOWN, "truncateIndex=%d", Long.valueOf(j));
            PreConditions.check(pushEntryResponse.getCode() == DLedgerResponseCode.SUCCESS.getCode(), DLedgerResponseCode.valueOf(pushEntryResponse.getCode()), "truncateIndex=%d", Long.valueOf(j));
            this.lastPushCommitTimeMs = System.currentTimeMillis();
            changeState(EntryDispatcherState.APPEND);
        }

        private void doAppend() throws Exception {
            while (!checkNotLeaderAndFreshState() && this.type.get() == EntryDispatcherState.APPEND) {
                doCheckAppendResponse();
                if (this.writeIndex > DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex()) {
                    if (this.batchAppendEntryRequest.getCount() > 0) {
                        sendBatchAppendEntryRequest();
                        return;
                    } else {
                        doCommit();
                        return;
                    }
                }
                if (this.writeIndex <= DLedgerEntryPusher.this.dLedgerStore.getLedgerBeforeBeginIndex()) {
                    this.logger.info("[Push-{}]The ledgerBeginBeginIndex={} is less than or equal to  writeIndex={}", new Object[]{this.peerId, Long.valueOf(DLedgerEntryPusher.this.dLedgerStore.getLedgerBeforeBeginIndex()), Long.valueOf(this.writeIndex)});
                    changeState(EntryDispatcherState.INSTALL_SNAPSHOT);
                    return;
                }
                if (this.pendingMap.size() >= 1000 || DLedgerUtils.elapsed(this.lastCheckLeakTimeMs) > 1000) {
                    long peerWaterMark = DLedgerEntryPusher.this.getPeerWaterMark(this.term, this.peerId);
                    for (Map.Entry<Long, Pair<Long, Integer>> entry : this.pendingMap.entrySet()) {
                        if ((entry.getKey().longValue() + entry.getValue().getValue().intValue()) - 1 <= peerWaterMark) {
                            this.pendingMap.remove(entry.getKey());
                        }
                    }
                    this.lastCheckLeakTimeMs = System.currentTimeMillis();
                }
                if (this.pendingMap.size() >= 1000) {
                    doCheckAppendResponse();
                    return;
                }
                long doAppendInner = doAppendInner(this.writeIndex);
                if (doAppendInner == -1) {
                    return;
                } else {
                    this.writeIndex = doAppendInner + 1;
                }
            }
        }

        private long doAppendInner(long j) throws Exception {
            DLedgerEntry dLedgerEntryForAppend = getDLedgerEntryForAppend(j);
            if (null == dLedgerEntryForAppend) {
                this.logger.error("[Push-{}]Get null entry from index={}", this.peerId, Long.valueOf(j));
                changeState(EntryDispatcherState.INSTALL_SNAPSHOT);
                return -1L;
            }
            checkQuotaAndWait(dLedgerEntryForAppend);
            this.batchAppendEntryRequest.addEntry(dLedgerEntryForAppend);
            if (!DLedgerEntryPusher.this.dLedgerConfig.isEnableBatchAppend() || this.batchAppendEntryRequest.getTotalSize() >= DLedgerEntryPusher.this.dLedgerConfig.getMaxBatchAppendSize() || DLedgerUtils.elapsed(this.lastAppendEntryRequestSendTimeMs) >= DLedgerEntryPusher.this.dLedgerConfig.getMaxBatchAppendIntervalMs()) {
                sendBatchAppendEntryRequest();
            }
            return dLedgerEntryForAppend.getIndex();
        }

        private void sendBatchAppendEntryRequest() throws Exception {
            this.batchAppendEntryRequest.setCommitIndex(DLedgerEntryPusher.this.memberState.getCommittedIndex());
            long firstEntryIndex = this.batchAppendEntryRequest.getFirstEntryIndex();
            long lastEntryIndex = this.batchAppendEntryRequest.getLastEntryIndex();
            long lastEntryTerm = this.batchAppendEntryRequest.getLastEntryTerm();
            long count = this.batchAppendEntryRequest.getCount();
            long totalSize = this.batchAppendEntryRequest.getTotalSize();
            StopWatch createStarted = StopWatch.createStarted();
            CompletableFuture<PushEntryResponse> push = DLedgerEntryPusher.this.dLedgerRpcService.push(this.batchAppendEntryRequest);
            this.pendingMap.put(Long.valueOf(firstEntryIndex), new Pair<>(Long.valueOf(System.currentTimeMillis()), Integer.valueOf(this.batchAppendEntryRequest.getCount())));
            push.whenComplete((pushEntryResponse, th) -> {
                try {
                    PreConditions.check(th == null, DLedgerResponseCode.UNKNOWN);
                    DLedgerResponseCode valueOf = DLedgerResponseCode.valueOf(pushEntryResponse.getCode());
                    switch (valueOf) {
                        case SUCCESS:
                            Attributes build = DLedgerMetricsManager.newAttributesBuilder().put(DLedgerMetricsConstant.LABEL_REMOTE_ID, this.peerId).build();
                            DLedgerMetricsManager.replicateEntryLatency.record(createStarted.getTime(TimeUnit.MICROSECONDS), build);
                            DLedgerMetricsManager.replicateEntryBatchCount.record(count, build);
                            DLedgerMetricsManager.replicateEntryBatchBytes.record(totalSize, build);
                            this.pendingMap.remove(Long.valueOf(firstEntryIndex));
                            if (lastEntryIndex > this.matchIndex) {
                                this.matchIndex = lastEntryIndex;
                                DLedgerEntryPusher.this.updatePeerWaterMark(lastEntryTerm, this.peerId, this.matchIndex);
                                break;
                            }
                            break;
                        case INCONSISTENT_STATE:
                            this.logger.info("[Push-{}]Get INCONSISTENT_STATE when append entries from {} to {} when term is {}", new Object[]{this.peerId, Long.valueOf(firstEntryIndex), Long.valueOf(lastEntryIndex), Long.valueOf(this.term)});
                            changeState(EntryDispatcherState.COMPARE);
                            break;
                        default:
                            this.logger.warn("[Push-{}]Get error response code {} {}", new Object[]{this.peerId, valueOf, pushEntryResponse.baseInfo()});
                            break;
                    }
                } catch (Throwable th) {
                    this.logger.error("Failed to deal with the callback when append request return", th);
                }
            });
            this.lastPushCommitTimeMs = System.currentTimeMillis();
            this.batchAppendEntryRequest.clear();
        }

        private void doInstallSnapshot() throws Exception {
            if (!checkNotLeaderAndFreshState() && this.type.get() == EntryDispatcherState.INSTALL_SNAPSHOT) {
                if (DLedgerEntryPusher.this.fsmCaller.getSnapshotManager() == null) {
                    this.logger.error("[DoInstallSnapshot-{}]snapshot mode is disabled", this.peerId);
                    changeState(EntryDispatcherState.COMPARE);
                    return;
                }
                SnapshotReader snapshotReaderIncludedTargetIndex = DLedgerEntryPusher.this.fsmCaller.getSnapshotManager().getSnapshotReaderIncludedTargetIndex(this.writeIndex);
                if (snapshotReaderIncludedTargetIndex == null) {
                    this.logger.error("[DoInstallSnapshot-{}]get latest snapshot whose lastIncludedIndex >= {}  failed", this.peerId, Long.valueOf(this.writeIndex));
                    changeState(EntryDispatcherState.COMPARE);
                    return;
                }
                DownloadSnapshot generateDownloadSnapshot = snapshotReaderIncludedTargetIndex.generateDownloadSnapshot();
                if (generateDownloadSnapshot == null) {
                    this.logger.error("[DoInstallSnapshot-{}]generate latest snapshot for download failed, index = {}", this.peerId, Long.valueOf(this.writeIndex));
                    changeState(EntryDispatcherState.COMPARE);
                    return;
                }
                long lastIncludedIndex = generateDownloadSnapshot.getMeta().getLastIncludedIndex();
                long lastIncludedTerm = generateDownloadSnapshot.getMeta().getLastIncludedTerm();
                InstallSnapshotRequest buildInstallSnapshotRequest = buildInstallSnapshotRequest(generateDownloadSnapshot);
                StopWatch createStarted = StopWatch.createStarted();
                InstallSnapshotResponse installSnapshotResponse = DLedgerEntryPusher.this.dLedgerRpcService.installSnapshot(buildInstallSnapshotRequest).get(3L, TimeUnit.SECONDS);
                PreConditions.check(installSnapshotResponse != null, DLedgerResponseCode.INTERNAL_ERROR, "installSnapshot lastIncludedIndex=%d", Long.valueOf(this.writeIndex));
                DLedgerResponseCode valueOf = DLedgerResponseCode.valueOf(installSnapshotResponse.getCode());
                switch (valueOf) {
                    case SUCCESS:
                        DLedgerMetricsManager.installSnapshotLatency.record(createStarted.getTime(TimeUnit.MICROSECONDS), DLedgerMetricsManager.newAttributesBuilder().put(DLedgerMetricsConstant.LABEL_REMOTE_ID, this.peerId).build());
                        this.logger.info("[DoInstallSnapshot-{}]install snapshot success, lastIncludedIndex = {}, lastIncludedTerm", new Object[]{this.peerId, Long.valueOf(lastIncludedIndex), Long.valueOf(lastIncludedTerm)});
                        if (lastIncludedIndex > this.matchIndex) {
                            this.matchIndex = lastIncludedIndex;
                            this.writeIndex = this.matchIndex + 1;
                        }
                        changeState(EntryDispatcherState.APPEND);
                        return;
                    case INSTALL_SNAPSHOT_ERROR:
                    case INCONSISTENT_STATE:
                        this.logger.info("[DoInstallSnapshot-{}]install snapshot failed, index = {}, term = {}", new Object[]{this.peerId, Long.valueOf(this.writeIndex), Long.valueOf(this.term)});
                        changeState(EntryDispatcherState.COMPARE);
                        return;
                    default:
                        this.logger.warn("[DoInstallSnapshot-{}]install snapshot failed because error response: code = {}, mas = {}, index = {}, term = {}", new Object[]{this.peerId, valueOf, installSnapshotResponse.baseInfo(), Long.valueOf(this.writeIndex), Long.valueOf(this.term)});
                        changeState(EntryDispatcherState.COMPARE);
                        return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/openmessaging/storage/dledger/DLedgerEntryPusher$EntryDispatcherState.class */
    public enum EntryDispatcherState {
        COMPARE,
        TRUNCATE,
        APPEND,
        INSTALL_SNAPSHOT,
        COMMIT
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/openmessaging/storage/dledger/DLedgerEntryPusher$EntryHandler.class */
    public class EntryHandler extends ShutdownAbleThread {
        private long lastCheckFastForwardTimeMs;
        ConcurrentMap<Long, Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>> writeRequestMap;
        BlockingQueue<Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>> compareOrTruncateRequests;
        private ReentrantLock inflightInstallSnapshotRequestLock;
        private Pair<InstallSnapshotRequest, CompletableFuture<InstallSnapshotResponse>> inflightInstallSnapshotRequest;

        public EntryHandler(Logger logger) {
            super("EntryHandler-" + DLedgerEntryPusher.this.memberState.getSelfId(), logger);
            this.lastCheckFastForwardTimeMs = System.currentTimeMillis();
            this.writeRequestMap = new ConcurrentHashMap();
            this.compareOrTruncateRequests = new ArrayBlockingQueue(1024);
            this.inflightInstallSnapshotRequestLock = new ReentrantLock();
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v26, types: [java.util.concurrent.CompletableFuture] */
        public CompletableFuture<InstallSnapshotResponse> handleInstallSnapshot(InstallSnapshotRequest installSnapshotRequest) {
            TimeoutFuture timeoutFuture = new TimeoutFuture(1000L);
            PreConditions.check(installSnapshotRequest.getData() != null && installSnapshotRequest.getData().length > 0, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
            long lastIncludedIndex = installSnapshotRequest.getLastIncludedIndex();
            this.inflightInstallSnapshotRequestLock.lock();
            try {
                TimeoutFuture timeoutFuture2 = null;
                if (this.inflightInstallSnapshotRequest == null || this.inflightInstallSnapshotRequest.getKey().getLastIncludedIndex() < lastIncludedIndex) {
                    this.logger.warn("[MONITOR]The install snapshot request with index {} preempt inflight slot because of newer index", Long.valueOf(lastIncludedIndex));
                    if (this.inflightInstallSnapshotRequest != null && this.inflightInstallSnapshotRequest.getValue() != null) {
                        timeoutFuture2 = (CompletableFuture) this.inflightInstallSnapshotRequest.getValue();
                    }
                    this.inflightInstallSnapshotRequest = new Pair<>(installSnapshotRequest, timeoutFuture);
                } else {
                    timeoutFuture2 = timeoutFuture;
                    this.logger.warn("[MONITOR]The install snapshot request with index {} has already existed", Long.valueOf(lastIncludedIndex), this.inflightInstallSnapshotRequest.getKey());
                }
                if (timeoutFuture2 != null) {
                    InstallSnapshotResponse installSnapshotResponse = new InstallSnapshotResponse();
                    installSnapshotResponse.setGroup(installSnapshotRequest.getGroup());
                    installSnapshotResponse.setCode(DLedgerResponseCode.NEWER_INSTALL_SNAPSHOT_REQUEST_EXIST.getCode());
                    installSnapshotResponse.setTerm(installSnapshotRequest.getTerm());
                    timeoutFuture2.complete(installSnapshotResponse);
                }
                return timeoutFuture;
            } finally {
                this.inflightInstallSnapshotRequestLock.unlock();
            }
        }

        public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest pushEntryRequest) throws Exception {
            TimeoutFuture timeoutFuture = new TimeoutFuture(1000L);
            switch (pushEntryRequest.getType()) {
                case APPEND:
                    PreConditions.check(pushEntryRequest.getCount() > 0, DLedgerResponseCode.UNEXPECTED_ARGUMENT);
                    long firstEntryIndex = pushEntryRequest.getFirstEntryIndex();
                    Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> putIfAbsent = this.writeRequestMap.putIfAbsent(Long.valueOf(firstEntryIndex), new Pair<>(pushEntryRequest, timeoutFuture));
                    if (putIfAbsent != null) {
                        this.logger.warn("[MONITOR]The index {} has already existed with {} and curr is {}", new Object[]{Long.valueOf(firstEntryIndex), putIfAbsent.getKey().baseInfo(), pushEntryRequest.baseInfo()});
                        timeoutFuture.complete(buildResponse(pushEntryRequest, DLedgerResponseCode.REPEATED_PUSH.getCode()));
                        break;
                    }
                    break;
                case COMMIT:
                    synchronized (this) {
                        if (!this.compareOrTruncateRequests.offer(new Pair<>(pushEntryRequest, timeoutFuture))) {
                            this.logger.warn("compareOrTruncateRequests blockingQueue is full when put commit request");
                            timeoutFuture.complete(buildResponse(pushEntryRequest, DLedgerResponseCode.PUSH_REQUEST_IS_FULL.getCode()));
                        }
                    }
                    break;
                case COMPARE:
                case TRUNCATE:
                    this.writeRequestMap.clear();
                    synchronized (this) {
                        if (!this.compareOrTruncateRequests.offer(new Pair<>(pushEntryRequest, timeoutFuture))) {
                            this.logger.warn("compareOrTruncateRequests blockingQueue is full when put compare or truncate request");
                            timeoutFuture.complete(buildResponse(pushEntryRequest, DLedgerResponseCode.PUSH_REQUEST_IS_FULL.getCode()));
                        }
                    }
                    break;
                default:
                    this.logger.error("[BUG]Unknown type {} from {}", pushEntryRequest.getType(), pushEntryRequest.baseInfo());
                    timeoutFuture.complete(buildResponse(pushEntryRequest, DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode()));
                    break;
            }
            wakeup();
            return timeoutFuture;
        }

        private PushEntryResponse buildResponse(PushEntryRequest pushEntryRequest, int i) {
            PushEntryResponse pushEntryResponse = new PushEntryResponse();
            pushEntryResponse.setGroup(pushEntryRequest.getGroup());
            pushEntryResponse.setCode(i);
            pushEntryResponse.setTerm(pushEntryRequest.getTerm());
            if (pushEntryRequest.getType() != PushEntryRequest.Type.COMMIT) {
                pushEntryResponse.setIndex(Long.valueOf(pushEntryRequest.getFirstEntryIndex()));
                pushEntryResponse.setCount(pushEntryRequest.getCount());
            }
            return pushEntryResponse;
        }

        private void handleDoAppend(long j, PushEntryRequest pushEntryRequest, CompletableFuture<PushEntryResponse> completableFuture) {
            try {
                PreConditions.check(j == pushEntryRequest.getFirstEntryIndex(), DLedgerResponseCode.INCONSISTENT_STATE);
                Iterator<DLedgerEntry> it = pushEntryRequest.getEntries().iterator();
                while (it.hasNext()) {
                    DLedgerEntryPusher.this.dLedgerStore.appendAsFollower(it.next(), pushEntryRequest.getTerm(), pushEntryRequest.getLeaderId());
                }
                completableFuture.complete(buildResponse(pushEntryRequest, DLedgerResponseCode.SUCCESS.getCode()));
                long min = Math.min(DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex(), pushEntryRequest.getCommitIndex());
                if (DLedgerEntryPusher.this.memberState.followerUpdateCommittedIndex(min)) {
                    DLedgerEntryPusher.this.fsmCaller.onCommitted(min);
                }
            } catch (Throwable th) {
                this.logger.error("[HandleDoAppend] writeIndex={}", Long.valueOf(j), th);
                completableFuture.complete(buildResponse(pushEntryRequest, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
            }
        }

        private CompletableFuture<PushEntryResponse> handleDoCompare(PushEntryRequest pushEntryRequest, CompletableFuture<PushEntryResponse> completableFuture) {
            long preLogIndex;
            long preLogTerm;
            try {
                PreConditions.check(pushEntryRequest.getType() == PushEntryRequest.Type.COMPARE, DLedgerResponseCode.UNKNOWN);
                preLogIndex = pushEntryRequest.getPreLogIndex();
                preLogTerm = pushEntryRequest.getPreLogTerm();
            } catch (Throwable th) {
                this.logger.error("[HandleDoCompare] preLogIndex={}, preLogTerm={}", new Object[]{Long.valueOf(pushEntryRequest.getPreLogIndex()), Long.valueOf(pushEntryRequest.getPreLogTerm()), th});
                completableFuture.complete(buildResponse(pushEntryRequest, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
            }
            if (preLogTerm == -1 && preLogIndex == -1) {
                completableFuture.complete(buildResponse(pushEntryRequest, DLedgerResponseCode.SUCCESS.getCode()));
                return completableFuture;
            }
            if (DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex() < preLogIndex) {
                PushEntryResponse buildResponse = buildResponse(pushEntryRequest, DLedgerResponseCode.INCONSISTENT_STATE.getCode());
                buildResponse.setEndIndex(DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex());
                completableFuture.complete(buildResponse);
                return completableFuture;
            }
            long ledgerBeforeBeginTerm = DLedgerEntryPusher.this.dLedgerStore.getLedgerBeforeBeginIndex() == preLogIndex ? DLedgerEntryPusher.this.dLedgerStore.getLedgerBeforeBeginTerm() : DLedgerEntryPusher.this.dLedgerStore.get(Long.valueOf(preLogIndex)).getTerm();
            if (ledgerBeforeBeginTerm == preLogTerm) {
                completableFuture.complete(buildResponse(pushEntryRequest, DLedgerResponseCode.SUCCESS.getCode()));
                return completableFuture;
            }
            DLedgerEntry firstLogOfTargetTerm = DLedgerEntryPusher.this.dLedgerStore.getFirstLogOfTargetTerm(ledgerBeforeBeginTerm, preLogIndex);
            PreConditions.check(firstLogOfTargetTerm != null, DLedgerResponseCode.INCONSISTENT_STATE);
            PushEntryResponse buildResponse2 = buildResponse(pushEntryRequest, DLedgerResponseCode.INCONSISTENT_STATE.getCode());
            buildResponse2.setXTerm(ledgerBeforeBeginTerm);
            buildResponse2.setXIndex(firstLogOfTargetTerm.getIndex());
            completableFuture.complete(buildResponse2);
            return completableFuture;
        }

        private CompletableFuture<PushEntryResponse> handleDoCommit(long j, PushEntryRequest pushEntryRequest, CompletableFuture<PushEntryResponse> completableFuture) {
            try {
                PreConditions.check(j == pushEntryRequest.getCommitIndex(), DLedgerResponseCode.UNKNOWN);
                PreConditions.check(pushEntryRequest.getType() == PushEntryRequest.Type.COMMIT, DLedgerResponseCode.UNKNOWN);
                long ledgerEndIndex = j <= DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex() ? j : DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex();
                if (DLedgerEntryPusher.this.memberState.followerUpdateCommittedIndex(ledgerEndIndex)) {
                    DLedgerEntryPusher.this.fsmCaller.onCommitted(ledgerEndIndex);
                }
                completableFuture.complete(buildResponse(pushEntryRequest, DLedgerResponseCode.SUCCESS.getCode()));
            } catch (Throwable th) {
                this.logger.error("[HandleDoCommit] committedIndex={}", Long.valueOf(pushEntryRequest.getCommitIndex()), th);
                completableFuture.complete(buildResponse(pushEntryRequest, DLedgerResponseCode.UNKNOWN.getCode()));
            }
            return completableFuture;
        }

        private CompletableFuture<PushEntryResponse> handleDoTruncate(long j, PushEntryRequest pushEntryRequest, CompletableFuture<PushEntryResponse> completableFuture) {
            try {
                this.logger.info("[HandleDoTruncate] truncateIndex={}", Long.valueOf(j));
                PreConditions.check(pushEntryRequest.getType() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN);
                PreConditions.check(DLedgerEntryPusher.this.dLedgerStore.truncate(j) == j - 1, DLedgerResponseCode.INCONSISTENT_STATE);
                completableFuture.complete(buildResponse(pushEntryRequest, DLedgerResponseCode.SUCCESS.getCode()));
                long commitIndex = pushEntryRequest.getCommitIndex() <= DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex() ? pushEntryRequest.getCommitIndex() : DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex();
                if (DLedgerEntryPusher.this.memberState.followerUpdateCommittedIndex(commitIndex)) {
                    DLedgerEntryPusher.this.fsmCaller.onCommitted(commitIndex);
                }
            } catch (Throwable th) {
                this.logger.error("[HandleDoTruncate] truncateIndex={}", Long.valueOf(j), th);
                completableFuture.complete(buildResponse(pushEntryRequest, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
            }
            return completableFuture;
        }

        private void handleDoInstallSnapshot(InstallSnapshotRequest installSnapshotRequest, CompletableFuture<InstallSnapshotResponse> completableFuture) {
            InstallSnapshotResponse installSnapshotResponse = new InstallSnapshotResponse();
            installSnapshotResponse.setGroup(installSnapshotRequest.getGroup());
            installSnapshotResponse.copyBaseInfo(installSnapshotRequest);
            try {
                this.logger.info("[HandleDoInstallSnapshot] begin to install snapshot, request={}", installSnapshotRequest);
                if (DLedgerEntryPusher.this.fsmCaller.getSnapshotManager().installSnapshot(new DownloadSnapshot(new SnapshotMeta(installSnapshotRequest.getLastIncludedIndex(), installSnapshotRequest.getLastIncludedTerm()), installSnapshotRequest.getData()))) {
                    installSnapshotResponse.code(DLedgerResponseCode.SUCCESS.getCode());
                    completableFuture.complete(installSnapshotResponse);
                } else {
                    installSnapshotResponse.code(DLedgerResponseCode.INSTALL_SNAPSHOT_ERROR.getCode());
                    completableFuture.complete(installSnapshotResponse);
                }
            } catch (Throwable th) {
                this.logger.error("[HandleDoInstallSnapshot] install snapshot failed, request={}", installSnapshotRequest, th);
                installSnapshotResponse.code(DLedgerResponseCode.INSTALL_SNAPSHOT_ERROR.getCode());
                completableFuture.complete(installSnapshotResponse);
            }
        }

        private void checkAppendFuture(long j) {
            Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> remove;
            long j2 = Long.MAX_VALUE;
            for (Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair : this.writeRequestMap.values()) {
                long firstEntryIndex = pair.getKey().getFirstEntryIndex();
                long lastEntryIndex = pair.getKey().getLastEntryIndex();
                if (lastEntryIndex <= j) {
                    try {
                        for (DLedgerEntry dLedgerEntry : pair.getKey().getEntries()) {
                            PreConditions.check(dLedgerEntry.equals(DLedgerEntryPusher.this.dLedgerStore.get(Long.valueOf(dLedgerEntry.getIndex()))), DLedgerResponseCode.INCONSISTENT_STATE);
                        }
                        pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.SUCCESS.getCode()));
                        this.logger.warn("[PushFallBehind]The leader pushed an batch append entry last index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", Long.valueOf(lastEntryIndex), Long.valueOf(j));
                    } catch (Throwable th) {
                        this.logger.error("[PushFallBehind]The leader pushed an batch append entry last index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", new Object[]{Long.valueOf(lastEntryIndex), Long.valueOf(j), th});
                        pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
                    }
                    this.writeRequestMap.remove(Long.valueOf(pair.getKey().getFirstEntryIndex()));
                } else {
                    if (firstEntryIndex == j + 1) {
                        return;
                    }
                    if (((TimeoutFuture) pair.getValue()).isTimeOut() && firstEntryIndex < j2) {
                        j2 = firstEntryIndex;
                    }
                }
            }
            if (j2 == Long.MAX_VALUE || (remove = this.writeRequestMap.remove(Long.valueOf(j2))) == null) {
                return;
            }
            this.logger.warn("[PushFastForward] ledgerEndIndex={} entryIndex={}", Long.valueOf(j), Long.valueOf(j2));
            remove.getValue().complete(buildResponse(remove.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));
        }

        private void checkAbnormalFuture(long j) {
            if (DLedgerUtils.elapsed(this.lastCheckFastForwardTimeMs) < 1000) {
                return;
            }
            this.lastCheckFastForwardTimeMs = System.currentTimeMillis();
            if (this.writeRequestMap.isEmpty()) {
                return;
            }
            checkAppendFuture(j);
        }

        private void clearCompareOrTruncateRequestsIfNeed() {
            synchronized (this) {
                if (!DLedgerEntryPusher.this.memberState.isFollower() && !this.compareOrTruncateRequests.isEmpty()) {
                    ArrayList<Pair> arrayList = new ArrayList();
                    this.compareOrTruncateRequests.drainTo(arrayList);
                    for (Pair pair : arrayList) {
                        ((CompletableFuture) pair.getValue()).complete(buildResponse((PushEntryRequest) pair.getKey(), DLedgerResponseCode.NOT_FOLLOWER.getCode()));
                    }
                }
            }
        }

        @Override // io.openmessaging.storage.dledger.common.ShutdownAbleThread
        public void doWork() {
            try {
                if (!DLedgerEntryPusher.this.memberState.isFollower()) {
                    clearCompareOrTruncateRequestsIfNeed();
                    waitForRunning(1L);
                    return;
                }
                Pair<InstallSnapshotRequest, CompletableFuture<InstallSnapshotResponse>> pair = null;
                this.inflightInstallSnapshotRequestLock.lock();
                try {
                    if (this.inflightInstallSnapshotRequest != null && this.inflightInstallSnapshotRequest.getKey() != null && this.inflightInstallSnapshotRequest.getValue() != null) {
                        pair = this.inflightInstallSnapshotRequest;
                        this.inflightInstallSnapshotRequest = new Pair<>(null, null);
                    }
                    this.inflightInstallSnapshotRequestLock.unlock();
                    if (pair != null) {
                        handleDoInstallSnapshot(pair.getKey(), pair.getValue());
                    }
                    if (this.compareOrTruncateRequests.peek() == null) {
                        long ledgerEndIndex = DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex() + 1;
                        Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> remove = this.writeRequestMap.remove(Long.valueOf(ledgerEndIndex));
                        if (remove != null) {
                            handleDoAppend(ledgerEndIndex, remove.getKey(), remove.getValue());
                            return;
                        } else {
                            checkAbnormalFuture(DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex());
                            waitForRunning(1L);
                            return;
                        }
                    }
                    Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> poll = this.compareOrTruncateRequests.poll();
                    PreConditions.check(poll != null, DLedgerResponseCode.UNKNOWN);
                    switch (poll.getKey().getType()) {
                        case COMMIT:
                            handleDoCommit(poll.getKey().getCommitIndex(), poll.getKey(), poll.getValue());
                            return;
                        case COMPARE:
                            handleDoCompare(poll.getKey(), poll.getValue());
                            return;
                        case TRUNCATE:
                            handleDoTruncate(poll.getKey().getPreLogIndex(), poll.getKey(), poll.getValue());
                            return;
                        default:
                            return;
                    }
                } catch (Throwable th) {
                    this.inflightInstallSnapshotRequestLock.unlock();
                    throw th;
                }
            } catch (Throwable th2) {
                DLedgerEntryPusher.LOGGER.error("Error in {}", getName(), th2);
                DLedgerUtils.sleep(100L);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/openmessaging/storage/dledger/DLedgerEntryPusher$QuorumAckChecker.class */
    public class QuorumAckChecker extends ShutdownAbleThread {
        private long lastPrintWatermarkTimeMs;
        private long lastCheckLeakTimeMs;
        private long lastCheckTimeoutTimeMs;

        public QuorumAckChecker(Logger logger) {
            super("QuorumAckChecker-" + DLedgerEntryPusher.this.memberState.getSelfId(), logger);
            this.lastPrintWatermarkTimeMs = System.currentTimeMillis();
            this.lastCheckLeakTimeMs = System.currentTimeMillis();
            this.lastCheckTimeoutTimeMs = System.currentTimeMillis();
        }

        @Override // io.openmessaging.storage.dledger.common.ShutdownAbleThread
        public void doWork() {
            try {
                if (DLedgerUtils.elapsed(this.lastPrintWatermarkTimeMs) > 3000) {
                    this.logger.info("[{}][{}] term={} ledgerBeforeBegin={} ledgerEnd={} committed={} watermarks={} appliedIndex={}", new Object[]{DLedgerEntryPusher.this.memberState.getSelfId(), DLedgerEntryPusher.this.memberState.getRole(), Long.valueOf(DLedgerEntryPusher.this.memberState.currTerm()), Long.valueOf(DLedgerEntryPusher.this.dLedgerStore.getLedgerBeforeBeginIndex()), Long.valueOf(DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex()), Long.valueOf(DLedgerEntryPusher.this.memberState.getCommittedIndex()), JSON.toJSONString(DLedgerEntryPusher.this.peerWaterMarksByTerm), Long.valueOf(DLedgerEntryPusher.this.memberState.getAppliedIndex())});
                    this.lastPrintWatermarkTimeMs = System.currentTimeMillis();
                }
                long currTerm = DLedgerEntryPusher.this.memberState.currTerm();
                DLedgerEntryPusher.this.checkTermForPendingMap(currTerm, "QuorumAckChecker");
                DLedgerEntryPusher.this.checkTermForWaterMark(currTerm, "QuorumAckChecker");
                if (DLedgerEntryPusher.this.pendingClosure.size() > 1) {
                    for (Long l : DLedgerEntryPusher.this.pendingClosure.keySet()) {
                        if (l.longValue() != currTerm) {
                            for (Map.Entry entry : ((ConcurrentMap) DLedgerEntryPusher.this.pendingClosure.get(l)).entrySet()) {
                                this.logger.info("[TermChange] Will clear the pending closure index={} for term changed from {} to {}", new Object[]{entry.getKey(), l, Long.valueOf(currTerm)});
                                ((Closure) entry.getValue()).done(Status.error(DLedgerResponseCode.EXPIRED_TERM));
                            }
                            DLedgerEntryPusher.this.pendingClosure.remove(l);
                        }
                    }
                }
                if (DLedgerEntryPusher.this.peerWaterMarksByTerm.size() > 1) {
                    for (Long l2 : DLedgerEntryPusher.this.peerWaterMarksByTerm.keySet()) {
                        if (l2.longValue() != currTerm) {
                            this.logger.info("[TermChange] Will clear the watermarks for term changed from {} to {}", l2, Long.valueOf(currTerm));
                            DLedgerEntryPusher.this.peerWaterMarksByTerm.remove(l2);
                        }
                    }
                }
                if (DLedgerUtils.elapsed(this.lastCheckLeakTimeMs) > 1000) {
                    DLedgerEntryPusher.this.checkResponseFuturesElapsed(DLedgerEntryPusher.this.memberState.getAppliedIndex());
                    this.lastCheckLeakTimeMs = System.currentTimeMillis();
                }
                if (DLedgerUtils.elapsed(this.lastCheckTimeoutTimeMs) > 1000) {
                    DLedgerEntryPusher.this.checkResponseFuturesTimeout(DLedgerEntryPusher.this.memberState.getAppliedIndex() + 1);
                    this.lastCheckTimeoutTimeMs = System.currentTimeMillis();
                }
                if (!DLedgerEntryPusher.this.memberState.isLeader()) {
                    waitForRunning(1L);
                    return;
                }
                DLedgerEntryPusher.this.updatePeerWaterMark(currTerm, DLedgerEntryPusher.this.memberState.getSelfId(), DLedgerEntryPusher.this.dLedgerStore.getLedgerEndIndex());
                List list = (List) ((Map) DLedgerEntryPusher.this.peerWaterMarksByTerm.get(Long.valueOf(currTerm))).values().stream().sorted(Comparator.reverseOrder()).collect(Collectors.toList());
                long longValue = ((Long) list.get(list.size() / 2)).longValue();
                if (DLedgerEntryPusher.this.memberState.leaderUpdateCommittedIndex(currTerm, longValue)) {
                    DLedgerEntryPusher.this.fsmCaller.onCommitted(longValue);
                } else {
                    waitForRunning(1L);
                }
            } catch (Throwable th) {
                DLedgerEntryPusher.LOGGER.error("Error in {}", getName(), th);
                DLedgerUtils.sleep(100L);
            }
        }
    }

    public DLedgerEntryPusher(DLedgerConfig dLedgerConfig, MemberState memberState, DLedgerStore dLedgerStore, DLedgerRpcService dLedgerRpcService) {
        this.dLedgerConfig = dLedgerConfig;
        this.selfId = this.dLedgerConfig.getSelfId();
        this.memberState = memberState;
        this.dLedgerStore = dLedgerStore;
        this.dLedgerRpcService = dLedgerRpcService;
        for (String str : memberState.getPeerMap().keySet()) {
            if (!str.equals(memberState.getSelfId())) {
                this.dispatcherMap.put(str, new EntryDispatcher(str, LOGGER));
            }
        }
        this.entryHandler = new EntryHandler(LOGGER);
        this.quorumAckChecker = new QuorumAckChecker(LOGGER);
    }

    public void startup() {
        this.entryHandler.start();
        this.quorumAckChecker.start();
        Iterator<EntryDispatcher> it = this.dispatcherMap.values().iterator();
        while (it.hasNext()) {
            it.next().start();
        }
    }

    public void shutdown() {
        this.entryHandler.shutdown();
        this.quorumAckChecker.shutdown();
        Iterator<EntryDispatcher> it = this.dispatcherMap.values().iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
    }

    public void registerStateMachine(StateMachineCaller stateMachineCaller) {
        this.fsmCaller = stateMachineCaller;
    }

    public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest pushEntryRequest) throws Exception {
        return this.entryHandler.handlePush(pushEntryRequest);
    }

    public CompletableFuture<InstallSnapshotResponse> handleInstallSnapshot(InstallSnapshotRequest installSnapshotRequest) {
        return this.entryHandler.handleInstallSnapshot(installSnapshotRequest);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkTermForWaterMark(long j, String str) {
        if (this.peerWaterMarksByTerm.containsKey(Long.valueOf(j))) {
            return;
        }
        LOGGER.info("Initialize the watermark in {} for term={}", str, Long.valueOf(j));
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        Iterator<String> it = this.memberState.getPeerMap().keySet().iterator();
        while (it.hasNext()) {
            concurrentHashMap.put(it.next(), -1L);
        }
        this.peerWaterMarksByTerm.putIfAbsent(Long.valueOf(j), concurrentHashMap);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkTermForPendingMap(long j, String str) {
        if (this.pendingClosure.containsKey(Long.valueOf(j))) {
            return;
        }
        LOGGER.info("Initialize the pending closure map in {} for term={}", str, Long.valueOf(j));
        this.pendingClosure.putIfAbsent(Long.valueOf(j), new ConcurrentHashMap());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updatePeerWaterMark(long j, String str, long j2) {
        synchronized (this.peerWaterMarksByTerm) {
            checkTermForWaterMark(j, "updatePeerWaterMark");
            if (this.peerWaterMarksByTerm.get(Long.valueOf(j)).get(str).longValue() < j2) {
                this.peerWaterMarksByTerm.get(Long.valueOf(j)).put(str, Long.valueOf(j2));
            }
        }
    }

    public long getPeerWaterMark(long j, String str) {
        long longValue;
        synchronized (this.peerWaterMarksByTerm) {
            checkTermForWaterMark(j, "getPeerWaterMark");
            longValue = this.peerWaterMarksByTerm.get(Long.valueOf(j)).get(str).longValue();
        }
        return longValue;
    }

    public boolean isPendingFull(long j) {
        checkTermForPendingMap(j, "isPendingFull");
        return this.pendingClosure.get(Long.valueOf(j)).size() > this.dLedgerConfig.getMaxPendingRequestsNum();
    }

    public void appendClosure(Closure closure, long j, long j2) {
        updatePeerWaterMark(j, this.memberState.getSelfId(), j2);
        checkTermForPendingMap(j, "waitAck");
        if (this.pendingClosure.get(Long.valueOf(j)).put(Long.valueOf(j2), closure) != null) {
            LOGGER.warn("[MONITOR] get old wait at term = {}, index= {}", Long.valueOf(j), Long.valueOf(j2));
        }
    }

    public void wakeUpDispatchers() {
        Iterator<EntryDispatcher> it = this.dispatcherMap.values().iterator();
        while (it.hasNext()) {
            it.next().wakeup();
        }
    }

    public boolean completeResponseFuture(ApplyEntry applyEntry) {
        Closure remove;
        long index = applyEntry.getEntry().getIndex();
        long currTerm = this.memberState.currTerm();
        ConcurrentMap<Long, Closure> concurrentMap = this.pendingClosure.get(Long.valueOf(currTerm));
        if (concurrentMap == null || (remove = concurrentMap.remove(Long.valueOf(index))) == null) {
            return false;
        }
        if (remove instanceof WriteClosure) {
            ((WriteClosure) remove).setResp(applyEntry.getResp());
        }
        remove.done(Status.ok());
        LOGGER.info("Complete closure, term = {}, index = {}", Long.valueOf(currTerm), Long.valueOf(index));
        return true;
    }

    public void checkResponseFuturesTimeout(long j) {
        long currTerm = this.memberState.currTerm();
        long committedIndex = this.memberState.getCommittedIndex() + this.dLedgerConfig.getMaxPendingRequestsNum() + 1;
        if (committedIndex > this.memberState.getLedgerEndIndex()) {
            committedIndex = this.memberState.getLedgerEndIndex() + 1;
        }
        ConcurrentMap<Long, Closure> concurrentMap = this.pendingClosure.get(Long.valueOf(currTerm));
        if (concurrentMap == null || concurrentMap.size() <= 0) {
            return;
        }
        long j2 = j;
        while (true) {
            long j3 = j2;
            if (j3 >= committedIndex) {
                return;
            }
            Closure closure = concurrentMap.get(Long.valueOf(j3));
            if (closure != null) {
                if (!closure.isTimeOut()) {
                    return;
                }
                closure.done(Status.error(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT));
                concurrentMap.remove(Long.valueOf(j3));
            }
            j2 = j3 + 1;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkResponseFuturesElapsed(long j) {
        ConcurrentMap<Long, Closure> concurrentMap = this.pendingClosure.get(Long.valueOf(this.memberState.currTerm()));
        for (Map.Entry<Long, Closure> entry : concurrentMap.entrySet()) {
            if (entry.getKey().longValue() <= j) {
                entry.getValue().done(Status.ok());
                concurrentMap.remove(entry.getKey());
            }
        }
    }
}
