package one.tomorrow.transactionaloutbox.reactive.repository;

import io.r2dbc.spi.Row;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Objects;
import lombok.Generated;
import one.tomorrow.transactionaloutbox.reactive.model.OutboxLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.reactive.TransactionalOperator;
import reactor.core.publisher.Mono;

@Repository
/* loaded from: input_file:one/tomorrow/transactionaloutbox/reactive/repository/OutboxLockRepository.class */
public class OutboxLockRepository {
    private static final Logger logger = LoggerFactory.getLogger(OutboxLockRepository.class);
    private final DatabaseClient db;
    private final TransactionalOperator rxtx;

    public Mono<Boolean> acquireOrRefreshLock(String str, Duration duration, boolean z) {
        Mono switchIfEmpty = selectOutboxLock(z).flatMap(outboxLock -> {
            return handleExistingLock(outboxLock, z, str, duration);
        }).switchIfEmpty(insertOutboxLock(str, duration));
        TransactionalOperator transactionalOperator = this.rxtx;
        Objects.requireNonNull(transactionalOperator);
        return ((Mono) switchIfEmpty.as(transactionalOperator::transactional)).onErrorResume(DataIntegrityViolationException.class, dataIntegrityViolationException -> {
            return handleDuplicateKey(dataIntegrityViolationException, str);
        }).onErrorResume(th -> {
            return (th instanceof DataAccessResourceFailureException) && th.toString().contains("could not obtain lock");
        }, th2 -> {
            return handleRowIsLocked(th2, str);
        });
    }

    private Mono<Boolean> handleDuplicateKey(DataIntegrityViolationException dataIntegrityViolationException, String str) {
        logger.info("Outbox lock for owner {} could not be created, another one has been created concurrently: {}", str, dataIntegrityViolationException);
        return Mono.just(false);
    }

    private Mono<Boolean> handleRowIsLocked(Throwable th, String str) {
        logger.info("Could not grab lock for owner {} - database row is locked: {}", str, th.toString());
        return Mono.just(false);
    }

    private Mono<OutboxLock> selectOutboxLock(boolean z) {
        return this.db.sql("select * from outbox_kafka_lock where id = :id" + (z ? " FOR UPDATE NOWAIT" : "")).bind("id", OutboxLock.OUTBOX_LOCK_ID).map(this::toOutboxLock).one();
    }

    private Mono<Boolean> insertOutboxLock(String str, Duration duration) {
        return Mono.defer(() -> {
            logger.debug("No outbox lock found. Creating one for {}", str);
            return this.db.sql("insert into outbox_kafka_lock (id, owner_id, valid_until) values (:id, :ownerId, :validUntil)").bind("id", OutboxLock.OUTBOX_LOCK_ID).bind("ownerId", str).bind("validUntil", Instant.now().plus((TemporalAmount) duration)).fetch().rowsUpdated().map(num -> {
                return Boolean.valueOf(num.intValue() > 0);
            });
        });
    }

    private Mono<Boolean> handleExistingLock(OutboxLock outboxLock, boolean z, String str, Duration duration) {
        if (isForeignValidLock(outboxLock, str)) {
            logger.debug("Found outbox lock with owner {}, valid until {} (now: {})", new Object[]{outboxLock.getOwnerId(), outboxLock.getValidUntil(), Instant.now()});
            return Mono.just(false);
        }
        if (!z) {
            return selectOutboxLock(true).flatMap(outboxLock2 -> {
                return handleExistingLock(outboxLock2, true, str, duration);
            });
        }
        if (str.equals(outboxLock.getOwnerId())) {
            logger.debug("Found outbox lock with requested owner {}, valid until {} - updating lock", outboxLock.getOwnerId(), outboxLock.getValidUntil());
            return this.db.sql("update outbox_kafka_lock set valid_until = :validUntil where id = :id and owner_id = :ownerId").bind("validUntil", Instant.now().plus((TemporalAmount) duration)).bind("id", OutboxLock.OUTBOX_LOCK_ID).bind("ownerId", str).fetch().rowsUpdated().map(num -> {
                return Boolean.valueOf(num.intValue() > 0);
            });
        }
        logger.info("Found expired outbox lock with owner {}, which was valid until {} - grabbing lock for {}", new Object[]{outboxLock.getOwnerId(), outboxLock.getValidUntil(), str});
        return this.db.sql("update outbox_kafka_lock set owner_id = :ownerId, valid_until = :validUntil where id = :id").bind("ownerId", str).bind("validUntil", Instant.now().plus((TemporalAmount) duration)).bind("id", OutboxLock.OUTBOX_LOCK_ID).fetch().rowsUpdated().map(num2 -> {
            return Boolean.valueOf(num2.intValue() > 0);
        });
    }

    private boolean isForeignValidLock(OutboxLock outboxLock, String str) {
        return !str.equals(outboxLock.getOwnerId()) && outboxLock.getValidUntil().isAfter(Instant.now());
    }

    private OutboxLock toOutboxLock(Row row) {
        return new OutboxLock((String) row.get("owner_id", String.class), (Instant) row.get("valid_until", Instant.class));
    }

    public Mono<Boolean> preventLockStealing(String str) {
        return lockOutboxLock(str).map(outboxLock -> {
            return true;
        }).defaultIfEmpty(false);
    }

    private Mono<OutboxLock> lockOutboxLock(String str) {
        return this.db.sql("select * from outbox_kafka_lock where owner_id = :ownerId for update").bind("ownerId", str).map(this::toOutboxLock).one();
    }

    public Mono<Void> releaseLock(String str) {
        return this.db.sql("delete from outbox_kafka_lock where owner_id = :ownerId").bind("ownerId", str).fetch().rowsUpdated().doOnNext(num -> {
            if (num.intValue() > 0) {
                logger.info("Released outbox lock for owner {}", str);
            } else {
                logger.info("Outbox lock for owner {} not found, nothing released.", str);
            }
        }).then();
    }

    @Generated
    public OutboxLockRepository(DatabaseClient databaseClient, TransactionalOperator transactionalOperator) {
        this.db = databaseClient;
        this.rxtx = transactionalOperator;
    }
}
