package io.graphoenix.r2dbc.executor;

import io.graphoenix.r2dbc.connection.ConnectionProvider;
import io.graphoenix.r2dbc.handler.ParameterBinder;
import io.graphoenix.r2dbc.utils.ResultUtil;
import io.r2dbc.spi.Batch;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.Statement;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Stream;
import org.tinylog.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@ApplicationScoped
/* loaded from: input_file:io/graphoenix/r2dbc/executor/MutationExecutor.class */
public class MutationExecutor {
    private final ConnectionProvider connectionProvider;
    private final ParameterBinder parameterBinder;

    @Inject
    public MutationExecutor(ConnectionProvider connectionProvider, ParameterBinder parameterBinder) {
        this.connectionProvider = connectionProvider;
        this.parameterBinder = parameterBinder;
    }

    public Flux<Long> executeMutationsInBatch(Stream<String> stream) {
        return Flux.fromStream(stream).collectList().filter(list -> {
            return !list.isEmpty();
        }).flatMapMany(list2 -> {
            Mono<Connection> mono = this.connectionProvider.get();
            Function function = connection -> {
                Batch createBatch = connection.createBatch();
                Logger.info("execute statement count:\r\n{}", new Object[]{Integer.valueOf(list2.size())});
                Objects.requireNonNull(createBatch);
                list2.forEach(createBatch::add);
                return Flux.from(createBatch.execute()).flatMap(ResultUtil::getUpdateCountFromResult);
            };
            ConnectionProvider connectionProvider = this.connectionProvider;
            Objects.requireNonNull(connectionProvider);
            return Flux.usingWhen(mono, function, connectionProvider::close);
        });
    }

    public Flux<Long> executeMutationsInBatchByGroup(Stream<String> stream, int i) {
        Mono<Connection> mono = this.connectionProvider.get();
        Function function = connection -> {
            return Flux.fromStream(stream).window(i).flatMap(flux -> {
                return flux.collectList().filter(list -> {
                    return !list.isEmpty();
                }).flatMapMany(list2 -> {
                    Batch createBatch = connection.createBatch();
                    Logger.info("execute statement count:\r\n{}", new Object[]{Integer.valueOf(list2.size())});
                    Objects.requireNonNull(createBatch);
                    list2.forEach(createBatch::add);
                    return Flux.from(createBatch.execute()).flatMap(ResultUtil::getUpdateCountFromResult);
                });
            });
        };
        ConnectionProvider connectionProvider = this.connectionProvider;
        Objects.requireNonNull(connectionProvider);
        return Flux.usingWhen(mono, function, connectionProvider::close);
    }

    public Mono<Long> executeMutations(Stream<String> stream) {
        return executeMutations(stream, null);
    }

    public Mono<Long> executeMutations(Stream<String> stream, Map<String, Object> map) {
        return Flux.fromStream(stream).collectList().filter(list -> {
            return !list.isEmpty();
        }).flatMap(list2 -> {
            Mono<Connection> mono = this.connectionProvider.get();
            Function function = connection -> {
                String join = String.join(";", list2);
                Logger.info("execute mutation:\r\n{}", new Object[]{join});
                Logger.info("sql parameters:\r\n{}", new Object[]{map});
                Statement createStatement = connection.createStatement(join);
                this.parameterBinder.bindParameters(join, createStatement, map);
                return Mono.from(createStatement.execute()).flatMap(ResultUtil::getUpdateCountFromResult);
            };
            ConnectionProvider connectionProvider = this.connectionProvider;
            Objects.requireNonNull(connectionProvider);
            return Mono.usingWhen(mono, function, connectionProvider::close);
        });
    }

    public Flux<Long> executeMutationsFlux(Stream<String> stream) {
        return executeMutationsFlux(stream, null);
    }

    public Flux<Long> executeMutationsFlux(Stream<String> stream, Map<String, Object> map) {
        Mono<Connection> mono = this.connectionProvider.get();
        Function function = connection -> {
            return Flux.fromStream(stream).flatMap(str -> {
                Logger.info("execute mutation:\r\n{}", new Object[]{str});
                Logger.info("sql parameters:\r\n{}", new Object[]{map});
                Statement createStatement = connection.createStatement(str);
                this.parameterBinder.bindParameters(str, createStatement, map);
                return Flux.from(createStatement.execute()).flatMap(ResultUtil::getUpdateCountFromResult);
            });
        };
        ConnectionProvider connectionProvider = this.connectionProvider;
        Objects.requireNonNull(connectionProvider);
        return Flux.usingWhen(mono, function, connectionProvider::close);
    }

    public Mono<Long> executeMutation(String str) {
        return executeMutation(str, null);
    }

    public Mono<Long> executeMutation(String str, Map<String, Object> map) {
        if (str.isBlank()) {
            return Mono.empty();
        }
        Mono<Connection> mono = this.connectionProvider.get();
        Function function = connection -> {
            Logger.info("execute mutation:\r\n{}", new Object[]{str});
            Logger.info("sql parameters:\r\n{}", new Object[]{map});
            Statement createStatement = connection.createStatement(str);
            this.parameterBinder.bindParameters(str, createStatement, map);
            return Mono.from(createStatement.execute()).flatMap(ResultUtil::getUpdateCountFromResult);
        };
        ConnectionProvider connectionProvider = this.connectionProvider;
        Objects.requireNonNull(connectionProvider);
        return Mono.usingWhen(mono, function, connectionProvider::close);
    }
}
