package org.axonframework.eventhandling.replay;

import java.util.HashSet;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import org.axonframework.common.Assert;
import org.axonframework.domain.DomainEventMessage;
import org.axonframework.domain.EventMessage;
import org.axonframework.eventhandling.Cluster;
import org.joda.time.DateTime;
import org.joda.time.Duration;

/* loaded from: input_file:org/axonframework/eventhandling/replay/BackloggingIncomingMessageHandler.class */
public class BackloggingIncomingMessageHandler implements IncomingMessageHandler {
    private boolean inReplay;
    private final Queue<EventMessage> backlog;
    private final Set<String> replayedMessages;
    private DateTime backlogThreshold;
    private final Duration timeMargin;

    public BackloggingIncomingMessageHandler() {
        this(Duration.standardSeconds(5L));
    }

    public BackloggingIncomingMessageHandler(Duration duration) {
        this.inReplay = false;
        this.backlog = new LinkedList();
        this.replayedMessages = new HashSet();
        this.timeMargin = duration;
    }

    @Override // org.axonframework.eventhandling.replay.IncomingMessageHandler
    public synchronized void prepareForReplay(Cluster cluster) {
        Assert.isFalse(this.inReplay, "This message handler is already performing a replay. Are you using the same instances on multiple cluster?");
        this.inReplay = true;
        this.backlogThreshold = new DateTime().minus(this.timeMargin);
    }

    @Override // org.axonframework.eventhandling.replay.IncomingMessageHandler
    public synchronized void onIncomingMessages(Cluster cluster, EventMessage... eventMessageArr) {
        if (!this.inReplay) {
            cluster.publish(eventMessageArr);
            return;
        }
        for (EventMessage eventMessage : eventMessageArr) {
            if (eventMessage.getTimestamp().isAfter(this.backlogThreshold)) {
                this.backlog.add(eventMessage);
            }
        }
    }

    @Override // org.axonframework.eventhandling.replay.IncomingMessageHandler
    public void releaseMessage(DomainEventMessage domainEventMessage) {
        if (domainEventMessage.getTimestamp().isAfter(this.backlogThreshold)) {
            this.replayedMessages.add(domainEventMessage.getIdentifier());
        }
    }

    @Override // org.axonframework.eventhandling.replay.IncomingMessageHandler
    public synchronized void onReplayFailed(Cluster cluster, RuntimeException runtimeException) {
        this.inReplay = false;
        this.replayedMessages.clear();
        this.backlog.clear();
    }

    @Override // org.axonframework.eventhandling.replay.IncomingMessageHandler
    public synchronized void processBacklog(Cluster cluster) {
        this.inReplay = false;
        while (!this.backlog.isEmpty()) {
            EventMessage poll = this.backlog.poll();
            if (poll != null && !this.replayedMessages.contains(poll.getIdentifier())) {
                cluster.publish(poll);
            }
        }
        this.replayedMessages.clear();
    }
}
