package io.es4j.sql;

import io.es4j.sql.exceptions.Conflict;
import io.es4j.sql.exceptions.ConnectionFailure;
import io.es4j.sql.exceptions.DataException;
import io.es4j.sql.exceptions.GenericFailure;
import io.es4j.sql.exceptions.IntegrityContraintViolation;
import io.es4j.sql.exceptions.NotFound;
import io.es4j.sql.misc.Constants;
import io.es4j.sql.misc.EnvVars;
import io.es4j.sql.misc.SqlError;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.unchecked.Unchecked;
import io.vertx.core.impl.cpu.CpuCoreSensor;
import io.vertx.core.json.JsonObject;
import io.vertx.core.tracing.TracingPolicy;
import io.vertx.mutiny.core.Promise;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.shareddata.Lock;
import io.vertx.mutiny.pgclient.PgPool;
import io.vertx.mutiny.sqlclient.PreparedStatement;
import io.vertx.mutiny.sqlclient.Row;
import io.vertx.mutiny.sqlclient.RowIterator;
import io.vertx.mutiny.sqlclient.RowSet;
import io.vertx.mutiny.sqlclient.RowStream;
import io.vertx.mutiny.sqlclient.SqlClient;
import io.vertx.mutiny.sqlclient.SqlConnection;
import io.vertx.mutiny.sqlclient.Transaction;
import io.vertx.mutiny.sqlclient.Tuple;
import io.vertx.mutiny.sqlclient.templates.RowMapper;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgException;
import io.vertx.pgclient.SslMode;
import io.vertx.sqlclient.PoolOptions;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.net.ConnectException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/sql/RepositoryHandler.class */
public final class RepositoryHandler extends Record {
    private final Vertx vertx;
    private final PgPool pgPool;
    private final SqlClient sqlClient;
    private final JsonObject configuration;
    private static final Logger logger = LoggerFactory.getLogger(RepositoryHandler.class);

    public RepositoryHandler(Vertx vertx, PgPool pgPool, SqlClient sqlClient, JsonObject jsonObject) {
        this.vertx = vertx;
        this.pgPool = pgPool;
        this.sqlClient = sqlClient;
        this.configuration = jsonObject;
    }

    public static RepositoryHandler leasePool(JsonObject jsonObject, Vertx vertx) {
        return new RepositoryHandler(vertx, bootstrapPgPool(jsonObject, vertx), bootstrapSqlClient(jsonObject, vertx), jsonObject);
    }

    public static SqlClient bootstrapSqlClient(JsonObject jsonObject, Vertx vertx) {
        return PgPool.client(vertx, connectionOptions(jsonObject), pooledOptions(jsonObject));
    }

    private static PoolOptions poolOptions(JsonObject jsonObject) {
        return new PoolOptions().setConnectionTimeoutUnit(TimeUnit.SECONDS).setConnectionTimeout(jsonObject.getInteger("pgConnectionTimeOut", EnvVars.PG_CONNECTION_TIMEOUT).intValue()).setPoolCleanerPeriod(jsonObject.getInteger("pgPoolCleanerPeriod", EnvVars.PG_CLEANER_PERIOD).intValue()).setShared(true).setName(jsonObject.getString(Constants.SCHEMA, EnvVars.SCHEMA) + "-postgres-pool");
    }

    private static PoolOptions pooledOptions(JsonObject jsonObject) {
        return new PoolOptions().setConnectionTimeoutUnit(TimeUnit.SECONDS).setConnectionTimeout(jsonObject.getInteger("pgConnectionTimeOut", EnvVars.PG_CONNECTION_TIMEOUT).intValue()).setPoolCleanerPeriod(jsonObject.getInteger("pgPoolCleanerPeriod", EnvVars.PG_CLEANER_PERIOD).intValue()).setMaxSize(jsonObject.getInteger("pgPoolMaxSize", Integer.valueOf(CpuCoreSensor.availableProcessors() * 8)).intValue()).setMaxWaitQueueSize(jsonObject.getInteger("pgMaxWaitQueueSize", -1).intValue()).setShared(true).setName(jsonObject.getString(Constants.SCHEMA, EnvVars.SCHEMA) + "-postgres-pooled");
    }

    public static PgPool bootstrapPgPool(JsonObject jsonObject, Vertx vertx) {
        return PgPool.pool(vertx, connectionOptions(jsonObject), poolOptions(jsonObject));
    }

    public static PgConnectOptions connectionOptions(JsonObject jsonObject) {
        return new PgConnectOptions().setMetricsName(jsonObject.getString(Constants.SCHEMA, EnvVars.SCHEMA) + "-postgres").setTracingPolicy(TracingPolicy.PROPAGATE).setCachePreparedStatements(true).setLogActivity(jsonObject.getBoolean("logActivity", EnvVars.LOG_ACTIVITY).booleanValue()).setReconnectAttempts(jsonObject.getInteger("pgReconnect", EnvVars.PG_RECONNECT).intValue()).setSslMode(SslMode.of(jsonObject.getString("sslMode", EnvVars.SSL_MODE))).setReconnectInterval(jsonObject.getInteger("pgReConnectInterval", EnvVars.PG_CONNECT_INTERVAL).intValue()).setHost(jsonObject.getString(Constants.PG_HOST, EnvVars.PG_HOST)).setDatabase(jsonObject.getString(Constants.PG_DATABASE, EnvVars.PG_DATABASE)).setUser(jsonObject.getString(Constants.PG_USER, EnvVars.PG_USER)).setPassword(jsonObject.getString(Constants.PG_PASSWORD, EnvVars.PG_PASSWORD)).setPort(jsonObject.getInteger(Constants.PG_PORT, EnvVars.PG_PORT).intValue()).addProperty("search_path", jsonObject.getString(Constants.SCHEMA, EnvVars.SCHEMA)).addProperty("application_name", jsonObject.getString(Constants.SCHEMA, EnvVars.SCHEMA));
    }

    public static RepositoryHandler leasePool(JsonObject jsonObject, Vertx vertx, Class<?> cls) {
        jsonObject.put(Constants.SCHEMA, camelToSnake(cls.getSimpleName()));
        return leasePool(jsonObject, vertx);
    }

    public static String camelToSnake(String str) {
        return str.replaceAll("([a-z])([A-Z]+)", "$1_$2").toLowerCase();
    }

    public Uni<Void> close() {
        return this.pgPool.close().flatMap(r3 -> {
            return this.sqlClient.close();
        });
    }

    public Function<Supplier<Uni<RowSet<Row>>>, Uni<Integer>> handleUpdate(Class<?> cls) {
        Instant now = Instant.now();
        return supplier -> {
            return ((Uni) supplier.get()).map(rowSet -> {
                logger.info(cls.getSimpleName() + " updated in " + Duration.between(now, Instant.now()).toMillis() + "ms");
                if (rowSet == null || !rowSet.iterator().hasNext()) {
                    return null;
                }
                return ((Row) rowSet.iterator().next()).getInteger(Constants.VERSION);
            }).onItem().ifNull().failWith(new Conflict(new SqlError(null, cls.getSimpleName() + " version mismatch conflict ! unable to updated record", null, null))).onFailure(th -> {
                return checkError(th, cls);
            }).retry().withBackOff(Duration.ofMillis(this.configuration.getInteger("repositoryRetryBackOff", Integer.valueOf(EnvVars.REPOSITORY_RETRY_BACKOFF)).intValue())).atMost(this.configuration.getInteger("repositoryMaxRetry", Integer.valueOf(EnvVars.REPOSITORY_MAX_RETRY)).intValue()).onFailure().transform(this::mapError);
        };
    }

    public <T> Function<Supplier<Uni<RowSet<T>>>, Uni<T>> handleUpdateByKey(Class<T> cls) {
        Instant now = Instant.now();
        return supplier -> {
            return ((Uni) supplier.get()).map(Unchecked.function(rowSet -> {
                logger.info(cls.getSimpleName() + " fetched in " + Duration.between(now, Instant.now()).toMillis() + "ms");
                if (rowSet == null || !rowSet.iterator().hasNext()) {
                    throw NotFound.notFound(cls);
                }
                if (rowSet.rowCount() <= 1) {
                    return rowSet.iterator().next();
                }
                ArrayList arrayList = new ArrayList();
                RowIterator it = rowSet.iterator();
                Objects.requireNonNull(arrayList);
                it.forEachRemaining(arrayList::add);
                throw IntegrityContraintViolation.violation(cls, arrayList);
            })).onFailure(th -> {
                return checkError(th, cls);
            }).retry().withBackOff(Duration.ofMillis(this.configuration.getInteger("repositoryRetryBackOff", Integer.valueOf(EnvVars.REPOSITORY_RETRY_BACKOFF)).intValue())).atMost(this.configuration.getInteger("repositoryMaxRetry", Integer.valueOf(EnvVars.REPOSITORY_MAX_RETRY)).intValue()).onFailure().transform(this::mapError);
        };
    }

    public <T> Function<Supplier<Uni<RowSet<T>>>, Uni<List<T>>> handleUpdateByKeyBatch(Class<T> cls, int i) {
        Instant now = Instant.now();
        return supplier -> {
            return ((Uni) supplier.get()).onItem().transformToMulti((v0) -> {
                return v0.toMulti();
            }).collect().asList().map(Unchecked.function(list -> {
                logger.info("Fetched results in " + Duration.between(now, Instant.now()).toMillis() + "ms");
                if (i != list.size()) {
                    throw new Conflict(new SqlError("Size mismatch, should be " + i + " but is " + list.size(), "", null, null));
                }
                return list;
            })).onFailure(th -> {
                return checkError(th, cls);
            }).retry().withBackOff(Duration.ofMillis(this.configuration.getInteger("repositoryRetryBackOff", Integer.valueOf(EnvVars.REPOSITORY_RETRY_BACKOFF)).intValue())).atMost(this.configuration.getInteger("repositoryMaxRetry", Integer.valueOf(EnvVars.REPOSITORY_MAX_RETRY)).intValue()).onFailure().transform(this::mapError);
        };
    }

    private Throwable mapError(Throwable th) {
        if (!(th instanceof PgException)) {
            return th instanceof ConnectException ? new ConnectionFailure((ConnectException) th) : th;
        }
        PgException pgException = (PgException) th;
        return pgException.getSqlState().startsWith("22") ? new DataException(pgException) : pgException.getSqlState().startsWith("23") ? new IntegrityContraintViolation(pgException) : (pgException.getSqlState().startsWith("5") || pgException.getSqlState().startsWith("08")) ? new ConnectionFailure(pgException) : new GenericFailure(pgException);
    }

    public Function<Supplier<Uni<RowSet<Row>>>, Uni<Long>> handleInsert(Object obj) {
        Instant now = Instant.now();
        return supplier -> {
            return ((Uni) supplier.get()).map(rowSet -> {
                logger.debug(" Inserted in " + Duration.between(now, Instant.now()).toMillis() + "ms");
                if (rowSet.rowCount() == 0) {
                    return null;
                }
                return Long.valueOf(rowSet.rowCount());
            }).onItem().ifNull().failWith(IntegrityContraintViolation.violation(obj.getClass(), obj)).onFailure(th -> {
                return checkError(th, obj.getClass());
            }).retry().withBackOff(Duration.ofMillis(this.configuration.getInteger("repositoryRetryBackOff", Integer.valueOf(EnvVars.REPOSITORY_RETRY_BACKOFF)).intValue())).atMost(this.configuration.getInteger("repositoryMaxRetry", Integer.valueOf(EnvVars.REPOSITORY_MAX_RETRY)).intValue()).onFailure().transform(this::mapError);
        };
    }

    public <T> Function<Supplier<Uni<RowSet<T>>>, Uni<List<T>>> handleInsertBatch(Class<T> cls, int i) {
        Instant now = Instant.now();
        return supplier -> {
            return ((Uni) supplier.get()).onItem().transformToMulti((v0) -> {
                return v0.toMulti();
            }).collect().asList().map(Unchecked.function(list -> {
                logger.debug("Fetched results {} ", list);
                Instant now2 = Instant.now();
                if (i != list.size()) {
                    throw new Conflict(new SqlError("Size mismatch, should be " + i + " but is " + list.size(), "", null, null));
                }
                logger.info(cls.getSimpleName() + " query fetched " + list.size() + " in " + Duration.between(now, now2).toMillis() + "ms");
                return list;
            })).onFailure(th -> {
                return checkError(th, cls);
            }).retry().withBackOff(Duration.ofMillis(this.configuration.getInteger("repositoryRetryBackOff", Integer.valueOf(EnvVars.REPOSITORY_RETRY_BACKOFF)).intValue())).atMost(this.configuration.getInteger("repositoryMaxRetry", Integer.valueOf(EnvVars.REPOSITORY_MAX_RETRY)).intValue()).onFailure().transform(this::mapError);
        };
    }

    public Function<Supplier<Uni<RowSet<Row>>>, Uni<Long>> handleDelete(Class<?> cls) {
        logger.debug("Handling delete query for " + cls.getSimpleName());
        Instant now = Instant.now();
        return supplier -> {
            return ((Uni) supplier.get()).map(rowSet -> {
                logger.info(cls.getSimpleName() + " deleted in " + Duration.between(now, Instant.now()).toMillis() + "ms");
                if (rowSet.rowCount() == 0) {
                    return null;
                }
                return Long.valueOf(rowSet.rowCount());
            }).onItem().ifNull().failWith(NotFound.notFound(cls)).onFailure(th -> {
                return checkError(th, cls);
            }).retry().withBackOff(Duration.ofMillis(this.configuration.getInteger("repositoryRetryBackOff", Integer.valueOf(EnvVars.REPOSITORY_RETRY_BACKOFF)).intValue())).atMost(this.configuration.getInteger("repositoryMaxRetry", Integer.valueOf(EnvVars.REPOSITORY_MAX_RETRY)).intValue()).onFailure().transform(this::mapError);
        };
    }

    public <T> Function<Supplier<Uni<RowSet<T>>>, Uni<T>> handleSelectUnique(Class<T> cls, Logger logger2) {
        logger2.debug("Handling select query for " + cls.getSimpleName());
        Instant now = Instant.now();
        return supplier -> {
            return ((Uni) supplier.get()).map(Unchecked.function(rowSet -> {
                logger2.info(cls.getSimpleName() + " fetched in " + Duration.between(now, Instant.now()).toMillis() + "ms");
                if (rowSet == null || !rowSet.iterator().hasNext()) {
                    throw NotFound.notFound(cls);
                }
                if (rowSet.rowCount() <= 1) {
                    return rowSet.iterator().next();
                }
                ArrayList arrayList = new ArrayList();
                RowIterator it = rowSet.iterator();
                Objects.requireNonNull(arrayList);
                it.forEachRemaining(arrayList::add);
                throw IntegrityContraintViolation.violation(cls, arrayList);
            })).onFailure(th -> {
                return checkError(th, cls);
            }).retry().withBackOff(Duration.ofMillis(this.configuration.getInteger("repositoryRetryBackOff", Integer.valueOf(EnvVars.REPOSITORY_RETRY_BACKOFF)).intValue())).atMost(this.configuration.getInteger("repositoryMaxRetry", Integer.valueOf(EnvVars.REPOSITORY_MAX_RETRY)).intValue()).onFailure().transform(this::mapError);
        };
    }

    public <T> Function<Supplier<Uni<RowSet<T>>>, Uni<Void>> handleExists(Class<T> cls) {
        logger.debug("Handling exists query for " + cls.getSimpleName());
        Instant now = Instant.now();
        return supplier -> {
            return ((Uni) supplier.get()).map(rowSet -> {
                logger.info(cls.getSimpleName() + " fetched in " + Duration.between(now, Instant.now()).toMillis() + "ms");
                if (rowSet.iterator().hasNext()) {
                    return null;
                }
                return rowSet;
            }).onItem().ifNull().failWith(GenericFailure.duplicated(cls)).onFailure(th -> {
                return checkError(th, cls);
            }).retry().withBackOff(Duration.ofMillis(this.configuration.getInteger("repositoryRetryBackOff", Integer.valueOf(EnvVars.REPOSITORY_RETRY_BACKOFF)).intValue())).atMost(this.configuration.getInteger("repositoryMaxRetry", Integer.valueOf(EnvVars.REPOSITORY_MAX_RETRY)).intValue()).onFailure().transform(this::mapError).replaceWithVoid();
        };
    }

    public <T> Function<Supplier<Uni<RowSet<T>>>, Uni<List<T>>> handleQuery(Class<T> cls) {
        logger.debug("Handling selectQ query for " + cls.getSimpleName());
        Instant now = Instant.now();
        return supplier -> {
            return ((Uni) supplier.get()).onItem().transformToMulti((v0) -> {
                return v0.toMulti();
            }).collect().asList().map(list -> {
                logger.info(cls.getSimpleName() + " query fetched " + (list == null ? 0 : list.size()) + " in " + Duration.between(now, Instant.now()).toMillis() + "ms");
                if (list == null || list.isEmpty()) {
                    return null;
                }
                return list;
            }).onItem().ifNull().failWith(NotFound.notFound(cls)).onFailure(th -> {
                return checkError(th, cls);
            }).retry().withBackOff(Duration.ofMillis(this.configuration.getInteger("repositoryRetryBackOff", Integer.valueOf(EnvVars.REPOSITORY_RETRY_BACKOFF)).intValue())).atMost(this.configuration.getInteger("repositoryMaxRetry", Integer.valueOf(EnvVars.REPOSITORY_MAX_RETRY)).intValue()).onFailure().transform(this::mapError);
        };
    }

    private boolean checkError(Throwable th, Class<?> cls) {
        if (!(th instanceof PgException)) {
            return false;
        }
        PgException pgException = (PgException) th;
        boolean z = pgException.getSqlState().startsWith("5") || pgException.getSqlState().startsWith("08");
        if (z) {
            logger.debug("Recoverable failure handling type for" + String.valueOf(cls) + " , repository will retry", pgException);
        } else {
            logger.error("Unrecoverable failure", pgException);
        }
        return z;
    }

    public <T> Function<Supplier<Uni<RowSet<T>>>, Multi<T>> handleQueryMultiStream(Class<T> cls) {
        logger.debug("Handling multi select query for " + cls.getSimpleName());
        return supplier -> {
            return ((Uni) supplier.get()).onItem().transformToMulti((v0) -> {
                return v0.toMulti();
            }).onFailure(th -> {
                return checkError(th, cls);
            }).retry().withBackOff(Duration.ofMillis(this.configuration.getInteger("repositoryRetryBackOff", Integer.valueOf(EnvVars.REPOSITORY_RETRY_BACKOFF)).intValue())).atMost(this.configuration.getInteger("repositoryMaxRetry", Integer.valueOf(EnvVars.REPOSITORY_MAX_RETRY)).intValue()).onFailure().transform(this::mapError);
        };
    }

    public <V> Uni<Void> handleStreamProcessing(PgPool pgPool, Lock lock, String str, RowMapper<V> rowMapper, Consumer<V> consumer) {
        logger.debug("Handling stream query :" + str);
        return pgPool.getConnection().onFailure().transform(Unchecked.function(th -> {
            logger.error(rowMapper.getClass().getSimpleName() + " row streamer failed to obtain connection", th);
            releaseLock(lock);
            throw new GenericFailure(new SqlError("Connection timeout", "Row streamer could not obtain connection for type :" + str, null, null));
        })).invoke(sqlConnection -> {
            logger.debug("Row streamer obtained connection");
        }).flatMap(sqlConnection2 -> {
            return sqlConnection2.begin().onFailure().invoke(th2 -> {
                logger.error("row streamer failed to start transaction", th2);
                releaseLock(lock);
                sqlConnection2.closeAndForget();
            }).invoke(transaction -> {
                logger.debug("Row streamer started transaction");
            }).flatMap(transaction2 -> {
                return sqlConnection2.prepare(str).onFailure().invoke(th3 -> {
                    logger.error("row streamer failed to start transaction", th3);
                    releaseLock(lock);
                    transaction2.commitAndForget();
                    transaction2.completionAndForget();
                    sqlConnection2.closeAndForget();
                }).invoke(preparedStatement -> {
                    RowStream createStream = preparedStatement.createStream(this.configuration.getInteger("repositoryStreamBatchSize", Integer.valueOf(EnvVars.REPOSITORY_STREAM_BATCH_SIZE)).intValue());
                    createStream.fetch(EnvVars.REPOSITORY_STREAM_BATCH_SIZE).handler(row -> {
                        logger.debug("Stream fetched " + row.toJson().encodePrettily());
                        consumer.accept(rowMapper.map(row));
                    }).exceptionHandler(th4 -> {
                        logger.error("Exception during row streaming ", th4);
                    }).endHandler(() -> {
                        logger.info("Closing stream....");
                        closeStream((RowStream<Row>) createStream, lock, transaction2, preparedStatement, sqlConnection2);
                    });
                });
            });
        }).replaceWithVoid();
    }

    public <V> Uni<Void> handleStreamProcessing(PgPool pgPool, Lock lock, String str, RowMapper<V> rowMapper, Consumer<V> consumer, Tuple tuple, Integer num) {
        logger.debug("Handling stream query :" + str);
        return pgPool.getConnection().onFailure().transform(Unchecked.function(th -> {
            logger.error(rowMapper.getClass().getSimpleName() + " row streamer failed to obtain connection", th);
            releaseLock(lock);
            throw new GenericFailure(new SqlError("Connection timeout", "Row streamer could not obtain connection for type :" + str, null, null));
        })).invoke(sqlConnection -> {
            logger.debug("Row streamer obtained connection");
        }).flatMap(sqlConnection2 -> {
            return sqlConnection2.begin().onFailure().invoke(th2 -> {
                logger.error("row streamer failed to start transaction", th2);
                releaseLock(lock);
                sqlConnection2.closeAndForget();
            }).invoke(transaction -> {
                logger.debug("Row streamer started transaction");
            }).flatMap(transaction2 -> {
                return sqlConnection2.prepare(str).onFailure().invoke(th3 -> {
                    logger.error("row streamer failed to start transaction", th3);
                    releaseLock(lock);
                    transaction2.commitAndForget();
                    transaction2.completionAndForget();
                    sqlConnection2.closeAndForget();
                }).invoke(preparedStatement -> {
                    RowStream createStream = preparedStatement.createStream(num.intValue(), tuple);
                    createStream.handler(row -> {
                        logger.debug("Stream fetched " + row.toJson().encodePrettily());
                        consumer.accept(rowMapper.map(row));
                    }).exceptionHandler(th4 -> {
                        logger.error("Exception during row streaming ", th4);
                    }).endHandler(() -> {
                        logger.info("Closing stream....");
                        closeStream((RowStream<Row>) createStream, lock, transaction2, preparedStatement, sqlConnection2);
                    });
                });
            });
        }).replaceWithVoid();
    }

    private void releaseLock(Lock lock) {
        if (lock != null) {
            lock.release();
            logger.info("Lock Released :" + String.valueOf(lock));
        }
    }

    public <V> Uni<Void> handleStreamProcessing(PgPool pgPool, Uni<Void> uni, String str, RowMapper<V> rowMapper, Consumer<V> consumer, Tuple tuple) {
        logger.debug("Handling stream query :" + str);
        return pgPool.getConnection().onFailure().call(Unchecked.function(th -> {
            logger.error(rowMapper.getClass().getSimpleName() + " row streamer failed to obtain connection", th);
            if (uni != null) {
                return uni.onItemOrFailure().transformToUni(Unchecked.function((r10, th) -> {
                    throw new GenericFailure(new SqlError("Connection timeout", "Row streamer could not obtain connection for type :" + str, null, null));
                }));
            }
            throw new GenericFailure(new SqlError("Connection timeout", "Row streamer could not obtain connection for type :" + str, null, null));
        })).invoke(sqlConnection -> {
            logger.debug("Row streamer obtained connection");
        }).flatMap(sqlConnection2 -> {
            return sqlConnection2.begin().onFailure().call(th2 -> {
                logger.error("row streamer failed to start transaction", th2);
                return uni != null ? sqlConnection2.close().call(r3 -> {
                    return uni;
                }).onItemOrFailure().call(Unchecked.function((r10, th2) -> {
                    throw new GenericFailure(new SqlError("Unable to obtain connection", "Row streamer could not obtain connection for type :" + str, "Row streamer could not obtain connection for type :" + str, null));
                })) : sqlConnection2.close().onItemOrFailure().call(Unchecked.function((r102, th3) -> {
                    throw new GenericFailure(new SqlError("Connection timeout", "Row streamer could not obtain connection for type :" + str, null, null));
                }));
            }).invoke(transaction -> {
                logger.debug("Row streamer started transaction");
            }).flatMap(transaction2 -> {
                Promise promise = Promise.promise();
                return sqlConnection2.prepare(str).onFailure().call(th3 -> {
                    logger.error("row streamer failed to start transaction", th3);
                    return uni != null ? transaction2.commit().call(r3 -> {
                        return transaction2.completion();
                    }).call(r32 -> {
                        return sqlConnection2.close();
                    }).call(r33 -> {
                        return uni;
                    }).onItemOrFailure().transform(Unchecked.function((r10, th3) -> {
                        throw new GenericFailure(new SqlError("Connection timeout", "Row streamer could not obtain connection for statement :" + str, null, null));
                    })) : transaction2.commit().call(r34 -> {
                        return transaction2.completion();
                    }).call(r35 -> {
                        return sqlConnection2.close();
                    }).onItemOrFailure().call(Unchecked.function((r102, th4) -> {
                        throw new GenericFailure(new SqlError("Connection timeout", "Row streamer could not obtain connection for type :" + str, null, null));
                    }));
                }).invoke(preparedStatement -> {
                    RowStream createStream = preparedStatement.createStream(this.configuration.getInteger("repositoryStreamBatchSize", Integer.valueOf(EnvVars.REPOSITORY_STREAM_BATCH_SIZE)).intValue(), tuple);
                    createStream.fetch(EnvVars.REPOSITORY_STREAM_BATCH_SIZE).handler(row -> {
                        logger.debug("Stream fetched " + row.toJson().encodePrettily());
                        consumer.accept(rowMapper.map(row));
                    }).exceptionHandler(th4 -> {
                        logger.error("Exception during row streaming ", th4);
                    }).endHandler(() -> {
                        logger.info("Closing stream....");
                        closeStream((RowStream<Row>) createStream, (Uni<Void>) uni, transaction2, preparedStatement, sqlConnection2);
                        promise.complete();
                    });
                }).call(preparedStatement2 -> {
                    return promise.future();
                });
            });
        }).replaceWithVoid();
    }

    private void closeStream(RowStream<Row> rowStream, Lock lock, Transaction transaction, PreparedStatement preparedStatement, SqlConnection sqlConnection) {
        rowStream.close().call(r3 -> {
            return transaction.commit();
        }).call(r32 -> {
            return transaction.completion();
        }).call(r33 -> {
            return preparedStatement.close();
        }).call(r34 -> {
            return sqlConnection.close();
        }).invoke(r5 -> {
            releaseLock(lock);
        }).subscribe().with(r35 -> {
            logger.info("Stream closed nicely....");
        }, th -> {
            logger.error("Error closing the stream", th);
        });
    }

    private void closeStream(RowStream<Row> rowStream, Uni<Void> uni, Transaction transaction, PreparedStatement preparedStatement, SqlConnection sqlConnection) {
        uni.call(r3 -> {
            return rowStream.close();
        }).call(r32 -> {
            return transaction.commit();
        }).call(r33 -> {
            return transaction.completion();
        }).call(r34 -> {
            return preparedStatement.close();
        }).call(r35 -> {
            return sqlConnection.close();
        }).subscribe().with(r36 -> {
            logger.info("Stream closed nicely....");
        }, th -> {
            logger.error("Error closing the stream", th);
        });
    }

    @Override // java.lang.Record
    public final String toString() {
        return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, RepositoryHandler.class), RepositoryHandler.class, "vertx;pgPool;sqlClient;configuration", "FIELD:Lio/es4j/sql/RepositoryHandler;->vertx:Lio/vertx/mutiny/core/Vertx;", "FIELD:Lio/es4j/sql/RepositoryHandler;->pgPool:Lio/vertx/mutiny/pgclient/PgPool;", "FIELD:Lio/es4j/sql/RepositoryHandler;->sqlClient:Lio/vertx/mutiny/sqlclient/SqlClient;", "FIELD:Lio/es4j/sql/RepositoryHandler;->configuration:Lio/vertx/core/json/JsonObject;").dynamicInvoker().invoke(this) /* invoke-custom */;
    }

    @Override // java.lang.Record
    public final int hashCode() {
        return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, RepositoryHandler.class), RepositoryHandler.class, "vertx;pgPool;sqlClient;configuration", "FIELD:Lio/es4j/sql/RepositoryHandler;->vertx:Lio/vertx/mutiny/core/Vertx;", "FIELD:Lio/es4j/sql/RepositoryHandler;->pgPool:Lio/vertx/mutiny/pgclient/PgPool;", "FIELD:Lio/es4j/sql/RepositoryHandler;->sqlClient:Lio/vertx/mutiny/sqlclient/SqlClient;", "FIELD:Lio/es4j/sql/RepositoryHandler;->configuration:Lio/vertx/core/json/JsonObject;").dynamicInvoker().invoke(this) /* invoke-custom */;
    }

    @Override // java.lang.Record
    public final boolean equals(Object obj) {
        return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, RepositoryHandler.class, Object.class), RepositoryHandler.class, "vertx;pgPool;sqlClient;configuration", "FIELD:Lio/es4j/sql/RepositoryHandler;->vertx:Lio/vertx/mutiny/core/Vertx;", "FIELD:Lio/es4j/sql/RepositoryHandler;->pgPool:Lio/vertx/mutiny/pgclient/PgPool;", "FIELD:Lio/es4j/sql/RepositoryHandler;->sqlClient:Lio/vertx/mutiny/sqlclient/SqlClient;", "FIELD:Lio/es4j/sql/RepositoryHandler;->configuration:Lio/vertx/core/json/JsonObject;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
    }

    public Vertx vertx() {
        return this.vertx;
    }

    public PgPool pgPool() {
        return this.pgPool;
    }

    public SqlClient sqlClient() {
        return this.sqlClient;
    }

    public JsonObject configuration() {
        return this.configuration;
    }
}
