package io.r2dbc.postgresql.client;

import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.r2dbc.postgresql.message.Format;
import io.r2dbc.postgresql.message.backend.BackendMessage;
import io.r2dbc.postgresql.message.backend.CloseComplete;
import io.r2dbc.postgresql.message.backend.CommandComplete;
import io.r2dbc.postgresql.message.backend.ErrorResponse;
import io.r2dbc.postgresql.message.backend.ParseComplete;
import io.r2dbc.postgresql.message.backend.PortalSuspended;
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
import io.r2dbc.postgresql.message.frontend.Bind;
import io.r2dbc.postgresql.message.frontend.Close;
import io.r2dbc.postgresql.message.frontend.CompositeFrontendMessage;
import io.r2dbc.postgresql.message.frontend.Describe;
import io.r2dbc.postgresql.message.frontend.Execute;
import io.r2dbc.postgresql.message.frontend.ExecutionType;
import io.r2dbc.postgresql.message.frontend.Flush;
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
import io.r2dbc.postgresql.message.frontend.Parse;
import io.r2dbc.postgresql.message.frontend.Sync;
import io.r2dbc.postgresql.util.Assert;
import io.r2dbc.postgresql.util.Operators;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

/* loaded from: input_file:BOOT-INF/lib/r2dbc-postgresql-0.8.6.RELEASE.jar:io/r2dbc/postgresql/client/ExtendedQueryMessageFlow.class */
public final class ExtendedQueryMessageFlow {
    public static final Pattern PARAMETER_SYMBOL = Pattern.compile("\\$([\\d]+)", 32);
    private static final Predicate<BackendMessage> PARSE_TAKE_UNTIL = backendMessage -> {
        return (backendMessage instanceof ParseComplete) || (backendMessage instanceof ReadyForQuery);
    };

    private ExtendedQueryMessageFlow() {
    }

    public static Flux<BackendMessage> execute(Binding binding, Client client, PortalNameSupplier portalNameSupplier, String str, String str2, boolean z, int i) {
        Assert.requireNonNull(binding, "binding must not be null");
        Assert.requireNonNull(client, "client must not be null");
        Assert.requireNonNull(portalNameSupplier, "portalNameSupplier must not be null");
        Assert.requireNonNull(str, "statementName must not be null");
        String str3 = portalNameSupplier.get();
        return Flux.defer(() -> {
            Flux<FrontendMessage> bindFlow = toBindFlow(client.getContext(), binding, str3, str, str2, z);
            return i == 0 ? fetchAll(bindFlow, client, str3) : fetchCursored(bindFlow, client, str3, i);
        }).doOnDiscard(ReferenceCounted.class, (v0) -> {
            ReferenceCountUtil.release(v0);
        });
    }

    private static Flux<BackendMessage> fetchAll(Flux<FrontendMessage> flux, Client client, String str) {
        return (Flux) client.exchange(flux.concatWithValues(new CompositeFrontendMessage(new Execute(str, 0), new Close(str, ExecutionType.PORTAL), Sync.INSTANCE))).as(Operators::discardOnCancel);
    }

    private static Flux<BackendMessage> fetchCursored(Flux<FrontendMessage> flux, Client client, String str, int i) {
        DirectProcessor create = DirectProcessor.create();
        FluxSink<T> sink = create.sink();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        return (Flux) client.exchange(flux.concatWithValues(new CompositeFrontendMessage(new Execute(str, i), Flush.INSTANCE)).concatWith(create)).handle((backendMessage, synchronousSink) -> {
            if (backendMessage instanceof CommandComplete) {
                sink.next(new Close(str, ExecutionType.PORTAL));
                sink.next(Sync.INSTANCE);
                sink.complete();
                synchronousSink.next(backendMessage);
                return;
            }
            if (backendMessage instanceof ErrorResponse) {
                sink.next(Sync.INSTANCE);
                sink.complete();
                synchronousSink.next(backendMessage);
            } else {
                if (!(backendMessage instanceof PortalSuspended)) {
                    synchronousSink.next(backendMessage);
                    return;
                }
                if (!atomicBoolean.get()) {
                    sink.next(new Execute(str, i));
                    sink.next(Flush.INSTANCE);
                } else {
                    sink.next(new Close(str, ExecutionType.PORTAL));
                    sink.next(Sync.INSTANCE);
                    sink.complete();
                }
            }
        }).as(flux2 -> {
            return Operators.discardOnCancel(flux2, () -> {
                atomicBoolean.set(true);
            });
        });
    }

    public static Flux<BackendMessage> parse(Client client, String str, String str2, int[] iArr) {
        Assert.requireNonNull(client, "client must not be null");
        Assert.requireNonNull(str, "name must not be null");
        Assert.requireNonNull(str2, "query must not be null");
        Assert.requireNonNull(iArr, "types must not be null");
        return client.exchange(PARSE_TAKE_UNTIL, Flux.just(new CompositeFrontendMessage(new Parse(str, iArr, str2), Flush.INSTANCE))).doOnNext(backendMessage -> {
            if (backendMessage instanceof ErrorResponse) {
                client.send(Sync.INSTANCE);
            }
        });
    }

    public static Flux<BackendMessage> closeStatement(Client client, String str) {
        Assert.requireNonNull(client, "client must not be null");
        Assert.requireNonNull(str, "name must not be null");
        Flux<BackendMessage> exchange = client.exchange(Flux.just(new CompositeFrontendMessage(new Close(str, ExecutionType.STATEMENT), Sync.INSTANCE)));
        Class<CloseComplete> cls = CloseComplete.class;
        CloseComplete.class.getClass();
        return exchange.takeUntil((v1) -> {
            return r1.isInstance(v1);
        });
    }

    private static Flux<FrontendMessage> toBindFlow(ConnectionContext connectionContext, Binding binding, String str, String str2, String str3, boolean z) {
        return Flux.fromIterable(binding.getParameterValues()).flatMap(publisher -> {
            return publisher == Parameter.NULL_VALUE ? Flux.just(Bind.NULL_VALUE) : Flux.from(publisher).reduce(Unpooled.compositeBuffer(), (compositeByteBuf, byteBuf) -> {
                return compositeByteBuf.addComponent(true, byteBuf);
            });
        }).collectList().flatMapMany(list -> {
            return Flux.just(new CompositeFrontendMessage(new Bind(str, binding.getParameterFormats(), list, resultFormat(z), str2), new Describe(str, ExecutionType.PORTAL)));
        }).doOnSubscribe(subscription -> {
            QueryLogger.logQuery(connectionContext, str3);
        });
    }

    private static Collection<Format> resultFormat(boolean z) {
        return z ? Format.binary() : Collections.emptyList();
    }
}
