package lepus.std;

import cats.effect.kernel.GenConcurrent;
import cats.syntax.ApplicativeErrorIdOps$;
import cats.syntax.FlatMapOps$;
import cats.syntax.package$all$;
import fs2.Stream;
import fs2.Stream$;
import java.io.Serializable;
import lepus.client.Channel;
import lepus.client.DeliveredMessage;
import lepus.client.Message;
import lepus.client.MessageTypes$package$;
import lepus.client.QueueAPI;
import lepus.client.apis.NormalMessagingChannel;
import lepus.protocol.domains.Domains$package$ExchangeName$;
import lepus.protocol.domains.Domains$package$QueueName$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.runtime.ModuleSerializationProxy;
import scala.util.Left;
import scala.util.NotGiven$;
import scala.util.Right;

/* compiled from: RPCChannel.scala */
/* loaded from: input_file:lepus/std/RPCChannel$.class */
public final class RPCChannel$ implements Serializable {
    public static final RPCChannel$ MODULE$ = new RPCChannel$();

    private RPCChannel$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(RPCChannel$.class);
    }

    public <F, I, O> Object server(RPCDefinition<I, O> rPCDefinition, Channel<F, NormalMessagingChannel<F>> channel, GenConcurrent<F, Throwable> genConcurrent) {
        package$all$ package_all_ = package$all$.MODULE$;
        QueueAPI queue = channel.queue();
        return package_all_.toFunctorOps(queue.declare(rPCDefinition.name(), queue.declare$default$2(), true, queue.declare$default$4(), queue.declare$default$5(), queue.declare$default$6(), queue.declare$default$7()), genConcurrent).map(option -> {
            return new RPCServer<F, I, O>(rPCDefinition, genConcurrent, channel) { // from class: lepus.std.RPCChannel$$anon$1
                private final RPCDefinition endpoint$2;
                private final GenConcurrent evidence$1$2;
                private final Channel ch$2;

                {
                    this.endpoint$2 = rPCDefinition;
                    this.evidence$1$2 = genConcurrent;
                    this.ch$2 = channel;
                }

                @Override // lepus.std.RPCServer
                public Object respond(RequestMethod requestMethod, Object obj) {
                    Left encode = this.endpoint$2.serverCodec().encode((ChannelCodec<O>) obj);
                    if (encode instanceof Left) {
                        return ApplicativeErrorIdOps$.MODULE$.raiseError$extension((Throwable) package$all$.MODULE$.catsSyntaxApplicativeErrorId((Throwable) encode.value()), this.evidence$1$2);
                    }
                    if (!(encode instanceof Right)) {
                        throw new MatchError(encode);
                    }
                    return FlatMapOps$.MODULE$.$greater$greater$extension(package$all$.MODULE$.catsSyntaxFlatMapOps(this.ch$2.messaging().publishRaw(MessageTypes$package$.MODULE$.EnvelopeRaw().apply(Domains$package$ExchangeName$.MODULE$.default(), requestMethod.sender(), false, (Message) ((Right) encode).value())), this.evidence$1$2), () -> {
                        return r2.respond$$anonfun$1(r3);
                    }, this.evidence$1$2);
                }

                @Override // lepus.std.RPCServer
                public Stream requests() {
                    NormalMessagingChannel messaging = this.ch$2.messaging();
                    return messaging.consumeRaw(this.endpoint$2.name(), messaging.consumeRaw$default$2(), false, messaging.consumeRaw$default$4(), messaging.consumeRaw$default$5(), messaging.consumeRaw$default$6()).flatMap(deliveredMessage -> {
                        Left decode = this.endpoint$2.clientCodec().decode(deliveredMessage.message());
                        if (decode instanceof Left) {
                            return Stream$.MODULE$.exec(reject(deliveredMessage.deliveryTag()));
                        }
                        if (!(decode instanceof Right)) {
                            throw new MatchError(decode);
                        }
                        Message message = (Message) ((Right) decode).value();
                        return (Stream) message.properties().messageId().zip(message.properties().replyTo().flatMap(RPCChannel$::lepus$std$RPCChannel$$anon$1$$_$_$$anonfun$1)).fold(() -> {
                            return r1.requests$$anonfun$1$$anonfun$1(r2);
                        }, (v2) -> {
                            return RPCChannel$.lepus$std$RPCChannel$$anon$1$$_$requests$$anonfun$1$$anonfun$2(r2, r3, v2);
                        });
                    }, NotGiven$.MODULE$.value());
                }

                @Override // lepus.std.RPCServer
                public Object ignore(RequestMethod requestMethod) {
                    NormalMessagingChannel messaging = this.ch$2.messaging();
                    return messaging.ack(requestMethod.tag(), messaging.ack$default$2());
                }

                @Override // lepus.std.RPCServer
                public Object reject(RequestMethod requestMethod) {
                    return reject(requestMethod.tag());
                }

                private Object reject(long j) {
                    return this.ch$2.messaging().reject(j, false);
                }

                private final Object respond$$anonfun$1(RequestMethod requestMethod) {
                    return this.ch$2.messaging().ack(requestMethod.tag(), false);
                }

                private final Stream requests$$anonfun$1$$anonfun$1(DeliveredMessage deliveredMessage) {
                    return Stream$.MODULE$.exec(reject(deliveredMessage.deliveryTag()));
                }
            };
        });
    }

    public <F, I, O> Object client(RPCDefinition<I, O> rPCDefinition, Option<String> option, Channel<F, NormalMessagingChannel<F>> channel, GenConcurrent<F, Throwable> genConcurrent) {
        Object as;
        package$all$ package_all_ = package$all$.MODULE$;
        if (None$.MODULE$.equals(option)) {
            package$all$ package_all_2 = package$all$.MODULE$;
            package$all$ package_all_3 = package$all$.MODULE$;
            QueueAPI queue = channel.queue();
            as = package_all_2.toFunctorOps(package_all_3.toFlatMapOps(queue.declare(Domains$package$QueueName$.MODULE$.autoGen(), queue.declare$default$2(), queue.declare$default$3(), true, queue.declare$default$5(), queue.declare$default$6(), queue.declare$default$7()), genConcurrent).flatMap(option2 -> {
                return genConcurrent.fromOption(option2, RPCChannel$::client$$anonfun$1$$anonfun$1);
            }), genConcurrent).map(declareOk -> {
                return declareOk.queue();
            });
        } else {
            if (!(option instanceof Some)) {
                throw new MatchError(option);
            }
            String str = (String) ((Some) option).value();
            package$all$ package_all_4 = package$all$.MODULE$;
            QueueAPI queue2 = channel.queue();
            as = package_all_4.toFunctorOps(queue2.declare(str, queue2.declare$default$2(), true, queue2.declare$default$4(), queue2.declare$default$5(), queue2.declare$default$6(), queue2.declare$default$7()), genConcurrent).as(str);
        }
        return package_all_.toFunctorOps(as, genConcurrent).map(str2 -> {
            return new RPCClient<F, I, O>(rPCDefinition, genConcurrent, channel, str2) { // from class: lepus.std.RPCChannel$$anon$2
                private final RPCDefinition endpoint$4;
                private final GenConcurrent F$3;
                private final Channel ch$4;
                private final String q$1;

                {
                    this.endpoint$4 = rPCDefinition;
                    this.F$3 = genConcurrent;
                    this.ch$4 = channel;
                    this.q$1 = str2;
                }

                @Override // lepus.std.RPCClient
                public Object send(String str2, Object obj) {
                    return this.endpoint$4.clientCodec().encode((ChannelCodec<I>) obj).fold(th -> {
                        return ApplicativeErrorIdOps$.MODULE$.raiseError$extension((Throwable) package$all$.MODULE$.catsSyntaxApplicativeErrorId(th), this.F$3);
                    }, message -> {
                        return this.ch$4.messaging().publishRaw(MessageTypes$package$.MODULE$.EnvelopeRaw().apply(Domains$package$ExchangeName$.MODULE$.default(), this.endpoint$4.name(), false, message.withMessageId(str2).withReplyTo(this.q$1)));
                    });
                }

                @Override // lepus.std.RPCClient
                public Stream responses() {
                    NormalMessagingChannel messaging = this.ch$4.messaging();
                    return messaging.consumeRaw(this.q$1, messaging.consumeRaw$default$2(), false, messaging.consumeRaw$default$4(), messaging.consumeRaw$default$5(), messaging.consumeRaw$default$6()).flatMap(deliveredMessage -> {
                        Left decode = this.endpoint$4.serverCodec().decode(deliveredMessage.message());
                        if (decode instanceof Left) {
                            return Stream$.MODULE$.exec(reject(deliveredMessage.deliveryTag()));
                        }
                        if (!(decode instanceof Right)) {
                            throw new MatchError(decode);
                        }
                        Message message = (Message) ((Right) decode).value();
                        return (Stream) message.properties().correlationId().fold(() -> {
                            return r1.responses$$anonfun$1$$anonfun$1(r2);
                        }, (v2) -> {
                            return RPCChannel$.lepus$std$RPCChannel$$anon$2$$_$responses$$anonfun$1$$anonfun$2(r2, r3, v2);
                        });
                    }, NotGiven$.MODULE$.value());
                }

                @Override // lepus.std.RPCClient
                public Object processed(ResponseMethod responseMethod) {
                    NormalMessagingChannel messaging = this.ch$4.messaging();
                    return messaging.ack(responseMethod.tag(), messaging.ack$default$2());
                }

                private Object reject(long j) {
                    return this.ch$4.messaging().reject(j, false);
                }

                private final Stream responses$$anonfun$1$$anonfun$1(DeliveredMessage deliveredMessage) {
                    return Stream$.MODULE$.exec(reject(deliveredMessage.deliveryTag()));
                }
            };
        });
    }

    public <F, I, O> Option<String> client$default$2() {
        return None$.MODULE$;
    }

    public static final /* synthetic */ Option lepus$std$RPCChannel$$anon$1$$_$_$$anonfun$1(String str) {
        return Domains$package$QueueName$.MODULE$.from(str).toOption();
    }

    public static final /* synthetic */ Stream lepus$std$RPCChannel$$anon$1$$_$requests$$anonfun$1$$anonfun$2(Message message, DeliveredMessage deliveredMessage, Tuple2 tuple2) {
        return Stream$.MODULE$.emit(RequestMethod$.MODULE$.apply((String) tuple2._1(), (String) tuple2._2(), message.payload(), deliveredMessage.deliveryTag()));
    }

    private static final Throwable client$$anonfun$1$$anonfun$1() {
        return new UnknownError("Must respond with Queue name");
    }

    public static final /* synthetic */ Stream lepus$std$RPCChannel$$anon$2$$_$responses$$anonfun$1$$anonfun$2(Message message, DeliveredMessage deliveredMessage, String str) {
        return Stream$.MODULE$.emit(ResponseMethod$.MODULE$.apply(str, message.payload(), deliveredMessage.deliveryTag()));
    }
}
