package org.hswebframework.ezorm.rdb.executor.jdbc;

import java.sql.Connection;
import java.util.function.Consumer;
import org.hswebframework.ezorm.rdb.executor.SqlRequest;
import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;

/* loaded from: input_file:org/hswebframework/ezorm/rdb/executor/jdbc/JdbcReactiveSqlExecutor.class */
public abstract class JdbcReactiveSqlExecutor extends JdbcSqlExecutor implements ReactiveSqlExecutor {
    private static final Logger log = LoggerFactory.getLogger(JdbcReactiveSqlExecutor.class);

    public JdbcReactiveSqlExecutor() {
        super(log);
    }

    public abstract Mono<Connection> getConnection();

    @Override // org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor
    public Mono<Integer> update(Publisher<SqlRequest> publisher) {
        return Mono.deferContextual(contextView -> {
            return getConnection().flatMap(connection -> {
                return toFlux(publisher).map(sqlRequest -> {
                    return Integer.valueOf(doUpdate((Logger) contextView.getOrDefault(Logger.class, log), connection, sqlRequest));
                }).reduce((v0, v1) -> {
                    return Math.addExact(v0, v1);
                });
            }).defaultIfEmpty(0);
        });
    }

    @Override // org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor
    public Mono<Void> execute(Publisher<SqlRequest> publisher) {
        return Mono.deferContextual(contextView -> {
            return getConnection().flatMap(connection -> {
                return toFlux(publisher).doOnNext(sqlRequest -> {
                    doExecute((Logger) contextView.getOrDefault(Logger.class, log), connection, sqlRequest);
                }).then();
            });
        });
    }

    @Override // org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor
    public <E> Flux<E> select(Publisher<SqlRequest> publisher, ResultWrapper<E, ?> resultWrapper) {
        return Flux.deferContextual(contextView -> {
            Logger logger = (Logger) contextView.getOrDefault(Logger.class, log);
            return Flux.create(fluxSink -> {
                Disposable.Composite composite = Disposables.composite();
                Mono subscribeOn = getConnection().flatMap(connection -> {
                    return toFlux(publisher).doOnNext(sqlRequest -> {
                        fluxSink.getClass();
                    }).then();
                }).subscribeOn(Schedulers.boundedElastic());
                Consumer consumer = r3 -> {
                    fluxSink.complete();
                };
                fluxSink.getClass();
                Consumer consumer2 = fluxSink::error;
                fluxSink.getClass();
                composite.add(subscribeOn.subscribe(consumer, consumer2, fluxSink::complete, Context.of(fluxSink.contextView())));
                fluxSink.onDispose(composite);
            });
        });
    }

    protected Flux<SqlRequest> toFlux(Publisher<SqlRequest> publisher) {
        return Flux.from(publisher);
    }
}
