package io.domainlifecycles.events.spring.outbox.poll;

import io.domainlifecycles.domain.types.DomainEvent;
import io.domainlifecycles.events.spring.outbox.api.OutboxBatch;
import io.domainlifecycles.events.spring.outbox.api.ProcessingResult;
import io.domainlifecycles.events.spring.outbox.api.TransactionalOutbox;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:io/domainlifecycles/events/spring/outbox/poll/AbstractOutboxPoller.class */
public abstract class AbstractOutboxPoller {
    private final TransactionalOutbox transactionalOutbox;
    private long pollingDelayMilliseconds = 10000;
    private long pollingPeriodMilliseconds = 1000;
    private volatile int maxBatchSize = 100;
    private final ScheduledExecutorService senderExecutorService = Executors.newScheduledThreadPool(1);
    private ScheduledFuture<?> sendFuture;

    public AbstractOutboxPoller(TransactionalOutbox transactionalOutbox) {
        this.transactionalOutbox = (TransactionalOutbox) Objects.requireNonNull(transactionalOutbox, "The OutboxSender need a non-null outbox!");
        resetSendSchedule();
    }

    private void resetSendSchedule() {
        if (this.sendFuture != null) {
            this.sendFuture.cancel(false);
        }
        this.sendFuture = this.senderExecutorService.scheduleAtFixedRate(() -> {
            sendEvents();
        }, this.pollingDelayMilliseconds, this.pollingPeriodMilliseconds, TimeUnit.MILLISECONDS);
    }

    protected void sendEvents() {
        OutboxBatch fetchBatchForSending = this.transactionalOutbox.fetchBatchForSending(this.maxBatchSize);
        if (fetchBatchForSending.getDomainEvents().isEmpty()) {
            return;
        }
        boolean z = true;
        for (DomainEvent domainEvent : fetchBatchForSending.getDomainEvents()) {
            ProcessingResult send = send(domainEvent);
            if (!ProcessingResult.OK.equals(send)) {
                z = false;
                this.transactionalOutbox.markFailed(domainEvent, send);
            }
        }
        if (z) {
            this.transactionalOutbox.sentSuccessfully(fetchBatchForSending);
        }
    }

    protected abstract ProcessingResult send(DomainEvent domainEvent);

    public void setPollingDelayMilliseconds(long j) {
        this.pollingDelayMilliseconds = j;
        resetSendSchedule();
    }

    public void setPollingPeriodMilliseconds(long j) {
        this.pollingPeriodMilliseconds = j;
        resetSendSchedule();
    }

    public void setMaxBatchSize(int i) {
        this.maxBatchSize = i;
    }
}
