package io.army.reactive;

import io.army.criteria.BatchDmlStatement;
import io.army.criteria.DqlStatement;
import io.army.criteria.InsertStatement;
import io.army.criteria.LiteralMode;
import io.army.criteria.NarrowDmlStatement;
import io.army.criteria.SimpleDmlStatement;
import io.army.criteria.SimpleDqlStatement;
import io.army.criteria.impl.inner._BatchStatement;
import io.army.criteria.impl.inner._Insert;
import io.army.criteria.impl.inner._ReturningDml;
import io.army.criteria.impl.inner._SingleUpdate;
import io.army.criteria.impl.inner._Statement;
import io.army.meta.ChildTableMeta;
import io.army.reactive.ArmyReactiveSessionFactory;
import io.army.reactive.ArmyReactiveStmtOptions;
import io.army.reactive.executor.ReactiveExecutor;
import io.army.session.ChildUpdateException;
import io.army.session.DataAccessException;
import io.army.session.Option;
import io.army.session.SessionClosedException;
import io.army.session.SessionException;
import io.army.session.TransactionInfo;
import io.army.session._ArmySession;
import io.army.session.executor.DriverSpiHolder;
import io.army.session.record.CurrentRecord;
import io.army.session.record.ResultStates;
import io.army.stmt.BatchStmt;
import io.army.stmt.PairBatchStmt;
import io.army.stmt.PairStmt;
import io.army.stmt.SimpleStmt;
import io.army.stmt.SingleSqlStmt;
import io.army.stmt.Stmt;
import io.army.stmt.TwoStmtQueryStmt;
import io.army.util.ArmyCriteria;
import io.army.util._Collections;
import io.army.util._Exceptions;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/army/reactive/ArmyReactiveSession.class */
abstract class ArmyReactiveSession extends _ArmySession<ArmyReactiveSessionFactory> implements ReactiveSession {
    private static final AtomicIntegerFieldUpdater<ArmyReactiveSession> SESSION_CLOSED;
    private static final AtomicReferenceFieldUpdater<ArmyReactiveSession, ConcurrentMap<Object, Object>> ATTRIBUTE_MAP;
    final ReactiveExecutor stmtExecutor;
    private volatile int sessionClosed;
    private volatile ConcurrentMap<Object, Object> attributeMap;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/army/reactive/ArmyReactiveSession$ValidateBatchStatesSubscriber.class */
    public static final class ValidateBatchStatesSubscriber extends ValidateSubscriber<ResultStates> {
        private static final AtomicReferenceFieldUpdater<ValidateBatchStatesSubscriber, Throwable> ERROR = AtomicReferenceFieldUpdater.newUpdater(ValidateBatchStatesSubscriber.class, Throwable.class, "error");
        private final FluxSink<ResultStates> sink;
        private final Map<Integer, ResultStates> statesMap;
        private final ChildTableMeta<?> domainTable;
        private volatile Throwable error;

        private ValidateBatchStatesSubscriber(FluxSink<ResultStates> fluxSink, Map<Integer, ResultStates> map, ChildTableMeta<?> childTableMeta) {
            super();
            this.sink = fluxSink.onRequest(this::onRequest);
            this.statesMap = _Collections.unmodifiableMap(map);
            this.domainTable = childTableMeta;
        }

        public void onNext(ResultStates resultStates) {
            ResultStates resultStates2 = this.statesMap.get(Integer.valueOf(resultStates.resultNo()));
            if (resultStates2 == null) {
                ERROR.compareAndSet(this, null, new ChildUpdateException(String.format("Not found %s for batch item[%s] , %s", ResultStates.class.getName(), Integer.valueOf(resultStates.resultNo()), this.domainTable)));
            } else if (resultStates2.affectedRows() == resultStates.affectedRows()) {
                this.sink.next(resultStates);
            } else {
                ERROR.compareAndSet(this, null, _Exceptions.batchChildUpdateRowsError(this.domainTable, resultStates.resultNo(), resultStates2.affectedRows(), resultStates.affectedRows()));
            }
        }

        public void onError(Throwable th) {
            if (DONE.compareAndSet(this, 0, 1)) {
                Throwable th2 = ERROR.get(this);
                if (th2 == null) {
                    this.sink.error(th);
                } else {
                    this.sink.error(new ChildUpdateException(String.format("occur two error :\n1- %s\n\n2- %s ", th2.getMessage(), th.getMessage()), th));
                }
            }
        }

        public void onComplete() {
            if (DONE.compareAndSet(this, 0, 1)) {
                Throwable th = ERROR.get(this);
                if (th == null) {
                    this.sink.complete();
                } else {
                    this.sink.error(th);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/army/reactive/ArmyReactiveSession$ValidateItemCountSubscriber.class */
    public static final class ValidateItemCountSubscriber<T> extends ValidateSubscriber<T> {
        private final FluxSink<T> sink;
        private final long expectedCount;
        private final AtomicLong count;

        private ValidateItemCountSubscriber(FluxSink<T> fluxSink, long j) {
            super();
            this.count = new AtomicLong(0L);
            this.sink = fluxSink.onRequest(this::onRequest);
            this.expectedCount = j;
        }

        public void onNext(T t) {
            this.count.addAndGet(1L);
            this.sink.next(t);
        }

        public void onError(Throwable th) {
            if (DONE.compareAndSet(this, 0, 1)) {
                this.sink.error(th);
            }
        }

        public void onComplete() {
            if (DONE.compareAndSet(this, 0, 1)) {
                if (this.count.get() == this.expectedCount) {
                    this.sink.complete();
                } else {
                    this.sink.error(new ChildUpdateException(String.format("parent insert row[%s] and child insert row[%s] not match.", Long.valueOf(this.expectedCount), Long.valueOf(this.count.get()))));
                }
            }
        }
    }

    /* loaded from: input_file:io/army/reactive/ArmyReactiveSession$ValidateSubscriber.class */
    private static abstract class ValidateSubscriber<T> implements Subscriber<T> {
        static final AtomicIntegerFieldUpdater<ValidateSubscriber> DONE = AtomicIntegerFieldUpdater.newUpdater(ValidateSubscriber.class, "done");
        private volatile int done;
        private Subscription s;

        private ValidateSubscriber() {
        }

        public final void onSubscribe(Subscription subscription) {
            this.s = subscription;
        }

        final void onRequest(long j) {
            Subscription subscription = this.s;
            if (subscription != null) {
                subscription.request(j);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ArmyReactiveSession(ArmyReactiveSessionFactory.ReactiveSessionBuilder<?, ?> reactiveSessionBuilder) {
        super(reactiveSessionBuilder);
        this.stmtExecutor = reactiveSessionBuilder.stmtExecutor;
        if (!$assertionsDisabled && this.stmtExecutor == null) {
            throw new AssertionError();
        }
    }

    @Override // io.army.reactive.ReactiveSession
    /* renamed from: sessionFactory */
    public final ReactiveSessionFactory mo9sessionFactory() {
        return this.factory;
    }

    public final boolean isReactive() {
        return true;
    }

    public final boolean isSync() {
        return false;
    }

    public final boolean inTransaction() {
        boolean z;
        if (isClosed()) {
            throw _Exceptions.sessionClosed(this);
        }
        try {
            z = this.stmtExecutor.inTransaction();
        } catch (DataAccessException e) {
            TransactionInfo obtainTransactionInfo = obtainTransactionInfo();
            z = obtainTransactionInfo != null && obtainTransactionInfo.inTransaction();
        } catch (Exception e2) {
            throw ((RuntimeException) handleExecutionError(e2));
        }
        return z;
    }

    @Override // io.army.reactive.ReactiveSession
    public final <R> Mono<R> queryOne(SimpleDqlStatement simpleDqlStatement, Class<R> cls) {
        return queryOne(simpleDqlStatement, cls, ArmyReactiveStmtOptions.DEFAULT);
    }

    @Override // io.army.reactive.ReactiveSession
    public final <R> Mono<R> queryOne(SimpleDqlStatement simpleDqlStatement, Class<R> cls, ReactiveStmtOption reactiveStmtOption) {
        return query(simpleDqlStatement, cls, reactiveStmtOption).take(2L).collect(Collectors.toCollection(_Collections::arrayList)).flatMap((v0) -> {
            return onlyRow(v0);
        });
    }

    @Override // io.army.reactive.ReactiveSession
    public final <R> Mono<Optional<R>> queryOneNullable(SimpleDqlStatement simpleDqlStatement, Class<R> cls) {
        return queryOneNullable(simpleDqlStatement, cls, ArmyReactiveStmtOptions.DEFAULT);
    }

    @Override // io.army.reactive.ReactiveSession
    public final <R> Mono<Optional<R>> queryOneNullable(SimpleDqlStatement simpleDqlStatement, Class<R> cls, ReactiveStmtOption reactiveStmtOption) {
        return queryNullable(simpleDqlStatement, cls, reactiveStmtOption).take(2L).collect(Collectors.toCollection(_Collections::arrayList)).flatMap((v0) -> {
            return onlyRow(v0);
        });
    }

    @Override // io.army.reactive.ReactiveSession
    public final <R> Mono<R> queryOneObject(SimpleDqlStatement simpleDqlStatement, Supplier<R> supplier) {
        return queryOneObject(simpleDqlStatement, supplier, ArmyReactiveStmtOptions.DEFAULT);
    }

    @Override // io.army.reactive.ReactiveSession
    public final <R> Mono<R> queryOneObject(SimpleDqlStatement simpleDqlStatement, Supplier<R> supplier, ReactiveStmtOption reactiveStmtOption) {
        return queryObject(simpleDqlStatement, supplier, reactiveStmtOption).take(2L).collect(Collectors.toCollection(_Collections::arrayList)).flatMap((v0) -> {
            return onlyRow(v0);
        });
    }

    @Override // io.army.reactive.ReactiveSession
    public final <R> Mono<R> queryOneRecord(SimpleDqlStatement simpleDqlStatement, Function<CurrentRecord, R> function) {
        return queryOneRecord(simpleDqlStatement, function, ArmyReactiveStmtOptions.DEFAULT);
    }

    @Override // io.army.reactive.ReactiveSession
    public final <R> Mono<R> queryOneRecord(SimpleDqlStatement simpleDqlStatement, Function<CurrentRecord, R> function, ReactiveStmtOption reactiveStmtOption) {
        return queryRecord(simpleDqlStatement, function, reactiveStmtOption).take(2L).collect(Collectors.toCollection(_Collections::arrayList)).flatMap((v0) -> {
            return onlyRow(v0);
        });
    }

    @Override // io.army.reactive.ReactiveSession
    public final <R> Flux<R> query(DqlStatement dqlStatement, Class<R> cls) {
        return query(dqlStatement, cls, ArmyReactiveStmtOptions.DEFAULT);
    }

    @Override // io.army.reactive.ReactiveSession
    public final <R> Flux<R> query(DqlStatement dqlStatement, Class<R> cls, ReactiveStmtOption reactiveStmtOption) {
        return executeQuery(dqlStatement, reactiveStmtOption, (singleSqlStmt, reactiveStmtOption2) -> {
            return this.stmtExecutor.query(singleSqlStmt, cls, reactiveStmtOption2);
        });
    }

    @Override // io.army.reactive.ReactiveSession
    public final <R> Flux<Optional<R>> queryNullable(DqlStatement dqlStatement, Class<R> cls) {
        return queryNullable(dqlStatement, cls, ArmyReactiveStmtOptions.DEFAULT);
    }

    @Override // io.army.reactive.ReactiveSession
    public final <R> Flux<Optional<R>> queryNullable(DqlStatement dqlStatement, Class<R> cls, ReactiveStmtOption reactiveStmtOption) {
        return executeQuery(dqlStatement, reactiveStmtOption, (singleSqlStmt, reactiveStmtOption2) -> {
            return this.stmtExecutor.queryOptional(singleSqlStmt, cls, reactiveStmtOption2);
        });
    }

    @Override // io.army.reactive.ReactiveSession
    public final <R> Flux<R> queryObject(DqlStatement dqlStatement, Supplier<R> supplier) {
        return queryObject(dqlStatement, supplier, ArmyReactiveStmtOptions.DEFAULT);
    }

    @Override // io.army.reactive.ReactiveSession
    public final <R> Flux<R> queryObject(DqlStatement dqlStatement, Supplier<R> supplier, ReactiveStmtOption reactiveStmtOption) {
        return executeQuery(dqlStatement, reactiveStmtOption, (singleSqlStmt, reactiveStmtOption2) -> {
            return this.stmtExecutor.queryObject(singleSqlStmt, supplier, reactiveStmtOption2);
        });
    }

    @Override // io.army.reactive.ReactiveSession
    public final <R> Flux<R> queryRecord(DqlStatement dqlStatement, Function<CurrentRecord, R> function) {
        return queryRecord(dqlStatement, function, ArmyReactiveStmtOptions.DEFAULT);
    }

    @Override // io.army.reactive.ReactiveSession
    public final <R> Flux<R> queryRecord(DqlStatement dqlStatement, Function<CurrentRecord, R> function, ReactiveStmtOption reactiveStmtOption) {
        return executeQuery(dqlStatement, reactiveStmtOption, (singleSqlStmt, reactiveStmtOption2) -> {
            return this.stmtExecutor.queryRecord(singleSqlStmt, function, reactiveStmtOption2);
        });
    }

    @Override // io.army.reactive.ReactiveSession
    public final <T> Mono<ResultStates> save(T t) {
        return save(t, ArmyReactiveStmtOptions.DEFAULT);
    }

    @Override // io.army.reactive.ReactiveSession
    public final <T> Mono<ResultStates> save(T t, ReactiveStmtOption reactiveStmtOption) {
        return update(ArmyCriteria.insertStmt(this, t), reactiveStmtOption);
    }

    @Override // io.army.reactive.ReactiveSession
    public final Mono<ResultStates> update(SimpleDmlStatement simpleDmlStatement) {
        return update(simpleDmlStatement, ArmyReactiveStmtOptions.DEFAULT);
    }

    @Override // io.army.reactive.ReactiveSession
    public final Mono<ResultStates> update(SimpleDmlStatement simpleDmlStatement, ReactiveStmtOption reactiveStmtOption) {
        return simpleDmlStatement instanceof _BatchStatement ? Mono.error(_Exceptions.unexpectedStatement(simpleDmlStatement)) : simpleDmlStatement instanceof InsertStatement ? executeInsert((InsertStatement) simpleDmlStatement, replaceIfNeed(reactiveStmtOption)) : executeUpdate(simpleDmlStatement, replaceIfNeed(reactiveStmtOption));
    }

    @Override // io.army.reactive.ReactiveSession
    public final <T> Mono<ResultStates> batchSave(List<T> list) {
        return batchSave(list, LiteralMode.DEFAULT, ArmyReactiveStmtOptions.DEFAULT);
    }

    @Override // io.army.reactive.ReactiveSession
    public final <T> Mono<ResultStates> batchSave(List<T> list, LiteralMode literalMode) {
        return batchSave(list, literalMode, ArmyReactiveStmtOptions.DEFAULT);
    }

    @Override // io.army.reactive.ReactiveSession
    public final <T> Mono<ResultStates> batchSave(List<T> list, ReactiveStmtOption reactiveStmtOption) {
        return batchSave(list, LiteralMode.DEFAULT, reactiveStmtOption);
    }

    @Override // io.army.reactive.ReactiveSession
    public final <T> Mono<ResultStates> batchSave(List<T> list, LiteralMode literalMode, ReactiveStmtOption reactiveStmtOption) {
        if (list.size() == 0) {
            throw new IllegalArgumentException("domainList must non-empty.");
        }
        return update(ArmyCriteria.batchInsertStmt(this, literalMode, list), reactiveStmtOption);
    }

    @Override // io.army.reactive.ReactiveSession
    public final Flux<ResultStates> batchUpdate(BatchDmlStatement batchDmlStatement) {
        return batchUpdate(batchDmlStatement, ArmyReactiveStmtOptions.DEFAULT);
    }

    @Override // io.army.reactive.ReactiveSession
    public final Flux<ResultStates> batchUpdate(BatchDmlStatement batchDmlStatement, ReactiveStmtOption reactiveStmtOption) {
        Flux<ResultStates> error;
        try {
            try {
            } catch (Throwable th) {
                error = Flux.error(_ArmySession.wrapIfNeed(th));
                if (batchDmlStatement instanceof _Statement) {
                    ((_Statement) batchDmlStatement).clear();
                }
            }
            if (!(batchDmlStatement instanceof _BatchStatement)) {
                throw _Exceptions.unexpectedStatement(batchDmlStatement);
            }
            assertSession(batchDmlStatement);
            BatchStmt parseDmlStatement = parseDmlStatement(batchDmlStatement, reactiveStmtOption);
            Consumer consumer = parseDmlStatement.hasOptimistic() ? OPTIMISTIC_LOCK_VALIDATOR : null;
            if (parseDmlStatement instanceof BatchStmt) {
                Flux<ResultStates> batchUpdate = this.stmtExecutor.batchUpdate(parseDmlStatement, reactiveStmtOption, Option.EMPTY_FUNC);
                if (consumer != null) {
                    batchUpdate = batchUpdate.doOnNext(consumer);
                }
                error = batchUpdate.onErrorMap(this::handleExecutionError);
            } else {
                if (!(parseDmlStatement instanceof PairBatchStmt)) {
                    throw _Exceptions.unexpectedStmt(parseDmlStatement);
                }
                if (!inTransaction()) {
                    throw updateChildNoTransaction();
                }
                PairBatchStmt pairBatchStmt = (PairBatchStmt) parseDmlStatement;
                ChildTableMeta batchUpdateDomainTable = getBatchUpdateDomainTable(batchDmlStatement);
                if (!$assertionsDisabled && batchUpdateDomainTable == null) {
                    throw new AssertionError();
                }
                Flux<ResultStates> batchUpdate2 = this.stmtExecutor.batchUpdate(pairBatchStmt.firstStmt(), reactiveStmtOption, Option.EMPTY_FUNC);
                if (consumer != null) {
                    batchUpdate2 = batchUpdate2.doOnNext(consumer);
                }
                error = batchUpdate2.collectMap((v0) -> {
                    return v0.resultNo();
                }, resultStates -> {
                    return resultStates;
                }, _Collections::hashMap).flatMapMany(map -> {
                    return validateBatchStates(this.stmtExecutor.batchUpdate(pairBatchStmt.secondStmt(), reactiveStmtOption, Option.EMPTY_FUNC), map, batchUpdateDomainTable);
                }).onErrorMap(this::handlePairStmtError);
            }
            if (batchDmlStatement instanceof _Statement) {
                ((_Statement) batchDmlStatement).clear();
            }
            return error;
        } catch (Throwable th2) {
            if (batchDmlStatement instanceof _Statement) {
                ((_Statement) batchDmlStatement).clear();
            }
            throw th2;
        }
    }

    public final long sessionIdentifier() throws SessionException {
        if (isClosed()) {
            throw _Exceptions.sessionClosed(this);
        }
        try {
            return this.stmtExecutor.sessionIdentifier();
        } catch (Exception e) {
            throw ((RuntimeException) handleExecutionError(e));
        }
    }

    @Override // io.army.reactive.ReactiveSession
    @Nullable
    public final TransactionInfo currentTransactionInfo() {
        return obtainTransactionInfo();
    }

    @Override // io.army.reactive.ReactiveSession
    public final Mono<TransactionInfo> transactionInfo() {
        Mono<TransactionInfo> onErrorMap;
        if (isClosed()) {
            onErrorMap = Mono.error(_Exceptions.sessionClosed(this));
        } else {
            TransactionInfo obtainTransactionInfo = obtainTransactionInfo();
            onErrorMap = obtainTransactionInfo == null ? this.stmtExecutor.transactionInfo().onErrorMap(this::handleExecutionError) : Mono.just(obtainTransactionInfo);
        }
        return onErrorMap;
    }

    @Override // io.army.reactive.ReactiveSession
    public final Mono<TransactionInfo> sessionTransactionCharacteristics() {
        return isClosed() ? Mono.error(_Exceptions.sessionClosed(this)) : this.stmtExecutor.sessionTransactionCharacteristics(Option.EMPTY_FUNC).onErrorMap(this::handleExecutionError);
    }

    @Override // io.army.reactive.ReactiveSession
    public final Mono<?> setSavePoint() {
        return setSavePoint(Option.EMPTY_FUNC);
    }

    @Override // io.army.reactive.ReactiveSession
    public final Mono<?> setSavePoint(Function<Option<?>, ?> function) {
        return this.stmtExecutor.setSavePoint(function).onErrorMap(this::handleExecutionError);
    }

    public final <T> T valueOf(Option<T> option) {
        try {
            return (T) this.stmtExecutor.valueOf(option);
        } catch (Exception e) {
            throw ((RuntimeException) handleExecutionError(e));
        }
    }

    public final Set<Option<?>> optionSet() {
        try {
            return this.stmtExecutor.optionSet();
        } catch (Exception e) {
            throw ((RuntimeException) handleExecutionError(e));
        }
    }

    public final boolean isClosed() {
        boolean z;
        if (this instanceof DriverSpiHolder) {
            z = this.sessionClosed != 0 || this.stmtExecutor.isClosed();
        } else {
            z = this.sessionClosed != 0;
        }
        return z;
    }

    @Override // io.army.reactive.ReactiveCloseable
    public final <T> Mono<T> close() {
        return Mono.defer(this::closeSession);
    }

    @Nullable
    protected final Map<Object, Object> obtainAttributeMap() {
        return this.attributeMap;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v9, types: [java.util.concurrent.ConcurrentMap] */
    protected final Map<Object, Object> obtainOrCreateAttributeMap() {
        ConcurrentMap<Object, Object> concurrentMap = this.attributeMap;
        if (concurrentMap != null) {
            return concurrentMap;
        }
        ConcurrentHashMap concurrentHashMap = _Collections.concurrentHashMap();
        if (!ATTRIBUTE_MAP.compareAndSet(this, null, concurrentHashMap)) {
            concurrentHashMap = (ConcurrentMap) ATTRIBUTE_MAP.get(this);
            if (!$assertionsDisabled && concurrentHashMap == null) {
                throw new AssertionError();
            }
        }
        return concurrentHashMap;
    }

    private ReactiveStmtOption defaultOption() {
        TransactionInfo obtainTransactionInfo = obtainTransactionInfo();
        return obtainTransactionInfo == null ? ArmyReactiveStmtOptions.DEFAULT : ArmyReactiveStmtOptions.overrideTimeoutIfNeed(ArmyReactiveStmtOptions.DEFAULT, obtainTransactionInfo);
    }

    private ReactiveStmtOption replaceIfNeed(ReactiveStmtOption reactiveStmtOption) {
        TransactionInfo obtainTransactionInfo;
        return ((reactiveStmtOption instanceof ArmyReactiveStmtOptions.TransactionOverrideOption) || (obtainTransactionInfo = obtainTransactionInfo()) == null) ? reactiveStmtOption : ArmyReactiveStmtOptions.overrideTimeoutIfNeed(reactiveStmtOption, obtainTransactionInfo);
    }

    private <R> Flux<R> executeQuery(DqlStatement dqlStatement, ReactiveStmtOption reactiveStmtOption, BiFunction<SingleSqlStmt, ReactiveStmtOption, Flux<R>> biFunction) {
        Flux<R> error;
        try {
            try {
                assertSession(dqlStatement);
                ReactiveStmtOption replaceIfNeed = replaceIfNeed(reactiveStmtOption);
                Stmt parseDqlStatement = parseDqlStatement(dqlStatement, replaceIfNeed);
                error = parseDqlStatement instanceof SingleSqlStmt ? biFunction.apply((SingleSqlStmt) parseDqlStatement, replaceIfNeed).onErrorMap(this::handleExecutionError) : !(parseDqlStatement instanceof PairStmt) ? Flux.error(_Exceptions.unexpectedStmt(parseDqlStatement)) : !inTransaction() ? Flux.error(updateChildNoTransaction()) : dqlStatement instanceof InsertStatement ? executePairInsertQuery((InsertStatement) dqlStatement, (PairStmt) parseDqlStatement, replaceIfNeed, biFunction).onErrorMap(this::handlePairStmtError) : Flux.error(_Exceptions.unexpectedStatement(dqlStatement));
                if (dqlStatement instanceof _Statement) {
                    ((_Statement) dqlStatement).clear();
                }
            } catch (Throwable th) {
                error = Flux.error(_ArmySession.wrapIfNeed(th));
                if (dqlStatement instanceof _Statement) {
                    ((_Statement) dqlStatement).clear();
                }
            }
            return error;
        } catch (Throwable th2) {
            if (dqlStatement instanceof _Statement) {
                ((_Statement) dqlStatement).clear();
            }
            throw th2;
        }
    }

    private <R> Flux<R> executePairInsertQuery(InsertStatement insertStatement, PairStmt pairStmt, ReactiveStmtOption reactiveStmtOption, BiFunction<SingleSqlStmt, ReactiveStmtOption, Flux<R>> biFunction) {
        _Insert._ChildInsert _childinsert = (_Insert._ChildInsert) insertStatement;
        boolean z = _childinsert.parentStmt() instanceof _ReturningDml;
        ChildTableMeta table = _childinsert.table();
        Function function = th -> {
            return _Exceptions.childInsertError(this, table, th);
        };
        return z ? biFunction.apply(pairStmt.firstStmt(), reactiveStmtOption).collect(Collectors.toCollection(_Collections::arrayList)).flatMapMany(arrayList -> {
            int size = arrayList.size();
            return size == 0 ? Flux.empty() : Flux.create(fluxSink -> {
                this.stmtExecutor.secondQuery((TwoStmtQueryStmt) pairStmt.secondStmt(), reactiveStmtOption, arrayList).onErrorMap(function).subscribe(new ValidateItemCountSubscriber(fluxSink, size));
            });
        }) : this.stmtExecutor.insert(pairStmt.firstStmt(), reactiveStmtOption).flatMapMany(resultStates -> {
            long affectedRows = resultStates.affectedRows();
            return affectedRows == 0 ? Flux.empty() : Flux.create(fluxSink -> {
                ((Flux) biFunction.apply(pairStmt.secondStmt(), reactiveStmtOption)).onErrorMap(function).subscribe(new ValidateItemCountSubscriber(fluxSink, affectedRows));
            });
        });
    }

    private Mono<ResultStates> executeInsert(InsertStatement insertStatement, ReactiveStmtOption reactiveStmtOption) {
        Mono<ResultStates> error;
        try {
            try {
                assertSession(insertStatement);
                SimpleStmt parseInsertStatement = parseInsertStatement(insertStatement);
                if (parseInsertStatement instanceof SimpleStmt) {
                    error = this.stmtExecutor.insert(parseInsertStatement, reactiveStmtOption).onErrorMap(this::handleExecutionError);
                } else if (!(parseInsertStatement instanceof PairStmt)) {
                    error = Mono.error(_Exceptions.unexpectedStmt(parseInsertStatement));
                } else if (inTransaction()) {
                    PairStmt pairStmt = (PairStmt) parseInsertStatement;
                    ChildTableMeta table = ((_Insert) insertStatement).table();
                    error = this.stmtExecutor.insert(pairStmt.firstStmt(), reactiveStmtOption).flatMap(resultStates -> {
                        return this.stmtExecutor.insert(pairStmt.secondStmt(), reactiveStmtOption).doOnSuccess(resultStates -> {
                            if (resultStates.affectedRows() != resultStates.affectedRows()) {
                                throw _Exceptions.parentChildRowsNotMatch(this, table, resultStates.affectedRows(), resultStates.affectedRows());
                            }
                        });
                    }).onErrorMap(this::handlePairStmtError);
                } else {
                    error = Mono.error(updateChildNoTransaction());
                }
                if (insertStatement instanceof _Statement) {
                    ((_Statement) insertStatement).clear();
                }
            } catch (Throwable th) {
                error = Mono.error(_ArmySession.wrapIfNeed(th));
                if (insertStatement instanceof _Statement) {
                    ((_Statement) insertStatement).clear();
                }
            }
            return error;
        } catch (Throwable th2) {
            if (insertStatement instanceof _Statement) {
                ((_Statement) insertStatement).clear();
            }
            throw th2;
        }
    }

    private Mono<ResultStates> executeUpdate(SimpleDmlStatement simpleDmlStatement, ReactiveStmtOption reactiveStmtOption) {
        Mono<ResultStates> error;
        try {
            try {
                assertSession(simpleDmlStatement);
                SimpleStmt parseDmlStatement = parseDmlStatement(simpleDmlStatement, reactiveStmtOption);
                Consumer consumer = parseDmlStatement.hasOptimistic() ? OPTIMISTIC_LOCK_VALIDATOR : null;
                if (parseDmlStatement instanceof SimpleStmt) {
                    Mono<ResultStates> update = this.stmtExecutor.update(parseDmlStatement, reactiveStmtOption, Option.EMPTY_FUNC);
                    if (consumer != null) {
                        update = update.doOnNext(consumer);
                    }
                    error = update.onErrorMap(this::handleExecutionError);
                } else if (!(parseDmlStatement instanceof PairStmt)) {
                    error = Mono.error(_Exceptions.unexpectedStmt(parseDmlStatement));
                } else if (!inTransaction()) {
                    error = Mono.error(updateChildNoTransaction());
                } else if (simpleDmlStatement instanceof NarrowDmlStatement) {
                    PairStmt pairStmt = (PairStmt) parseDmlStatement;
                    ChildTableMeta table = ((_SingleUpdate._ChildUpdate) simpleDmlStatement).table();
                    Mono<ResultStates> update2 = this.stmtExecutor.update(pairStmt.firstStmt(), reactiveStmtOption, Option.EMPTY_FUNC);
                    if (consumer != null) {
                        update2 = update2.doOnNext(consumer);
                    }
                    error = update2.flatMap(resultStates -> {
                        return this.stmtExecutor.update(pairStmt.secondStmt(), reactiveStmtOption, Option.EMPTY_FUNC).doOnSuccess(resultStates -> {
                            if (resultStates.affectedRows() != resultStates.affectedRows()) {
                                throw _Exceptions.parentChildRowsNotMatch(this, table, resultStates.affectedRows(), resultStates.affectedRows());
                            }
                        });
                    }).onErrorMap(this::handlePairStmtError);
                } else {
                    PairStmt pairStmt2 = (PairStmt) parseDmlStatement;
                    error = this.stmtExecutor.update(pairStmt2.firstStmt(), reactiveStmtOption, Option.EMPTY_FUNC).then(this.stmtExecutor.update(pairStmt2.secondStmt(), reactiveStmtOption, Option.EMPTY_FUNC));
                }
                if (simpleDmlStatement instanceof _Statement) {
                    ((_Statement) simpleDmlStatement).clear();
                }
            } catch (Throwable th) {
                error = Mono.error(_ArmySession.wrapIfNeed(th));
                if (simpleDmlStatement instanceof _Statement) {
                    ((_Statement) simpleDmlStatement).clear();
                }
            }
            return error;
        } catch (Throwable th2) {
            if (simpleDmlStatement instanceof _Statement) {
                ((_Statement) simpleDmlStatement).clear();
            }
            throw th2;
        }
    }

    private Flux<ResultStates> validateBatchStates(Flux<ResultStates> flux, Map<Integer, ResultStates> map, ChildTableMeta<?> childTableMeta) {
        return Flux.create(fluxSink -> {
            flux.subscribe(new ValidateBatchStatesSubscriber(fluxSink, map, childTableMeta));
        });
    }

    private <T> Mono<T> closeSession() {
        return !SESSION_CLOSED.compareAndSet(this, 0, 1) ? Mono.empty() : this.stmtExecutor.close();
    }

    private Throwable handleExecutionError(Throwable th) {
        if (th instanceof SessionClosedException) {
            SESSION_CLOSED.compareAndSet(this, 0, 1);
        }
        return _ArmySession.wrapIfNeed(th);
    }

    private Throwable handlePairStmtError(Throwable th) {
        if (!(th instanceof ChildUpdateException)) {
            return handleExecutionError(th);
        }
        ChildUpdateException childUpdateException = (ChildUpdateException) th;
        rollbackOnlyOnError(childUpdateException);
        return childUpdateException;
    }

    private static <R> Mono<R> onlyRow(List<R> list) {
        Mono<R> error;
        switch (list.size()) {
            case 0:
                error = Mono.empty();
                break;
            case 1:
                error = Mono.just(list.get(0));
                break;
            default:
                error = Mono.error(_Exceptions.nonSingleRow(list));
                break;
        }
        return error;
    }

    static {
        $assertionsDisabled = !ArmyReactiveSession.class.desiredAssertionStatus();
        SESSION_CLOSED = AtomicIntegerFieldUpdater.newUpdater(ArmyReactiveSession.class, "sessionClosed");
        ATTRIBUTE_MAP = AtomicReferenceFieldUpdater.newUpdater(ArmyReactiveSession.class, ConcurrentMap.class, "attributeMap");
    }
}
