package lepus.std;

import cats.Semigroupal$;
import cats.UnorderedFoldable$;
import cats.effect.kernel.Clock;
import cats.effect.kernel.Clock$;
import cats.effect.kernel.GenConcurrent;
import cats.syntax.ApplicativeErrorIdOps$;
import cats.syntax.EitherOps$;
import cats.syntax.SemigroupalOps2$;
import cats.syntax.package$all$;
import fs2.Stream;
import fs2.Stream$;
import java.io.Serializable;
import lepus.client.Channel;
import lepus.client.DeliveredMessage;
import lepus.client.ExchangeAPI;
import lepus.client.Message;
import lepus.client.MessageTypes$package$;
import lepus.client.QueueAPI;
import lepus.client.apis.NormalMessagingChannel;
import lepus.protocol.domains.Domains$package$ExchangeType$;
import lepus.protocol.domains.Domains$package$QueueName$;
import lepus.protocol.domains.Domains$package$ShortString$;
import lepus.protocol.domains.Domains$package$Timestamp$;
import lepus.std.EventChannel;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Some$;
import scala.Tuple2;
import scala.Tuple2$;
import scala.collection.StringOps$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.concurrent.duration.FiniteDuration;
import scala.package$;
import scala.quoted.Expr;
import scala.quoted.FromExpr$;
import scala.quoted.Quotes;
import scala.runtime.BoxesRunTime;
import scala.runtime.ModuleSerializationProxy;
import scala.util.Either;
import scala.util.Left;
import scala.util.NotGiven$;
import scala.util.Right;
import scala.util.matching.Regex;

/* compiled from: EventChannel.scala */
/* loaded from: input_file:lepus/std/EventChannel$.class */
public final class EventChannel$ implements Serializable {
    public static final EventChannel$InvalidTopicName$ InvalidTopicName = null;
    public static final EventChannel$ MODULE$ = new EventChannel$();

    private EventChannel$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(EventChannel$.class);
    }

    public <F, T> Object publisher(TopicDefinition<T> topicDefinition, Channel<F, NormalMessagingChannel<F>> channel, GenConcurrent<F, Throwable> genConcurrent, Clock<F> clock) {
        package$all$ package_all_ = package$all$.MODULE$;
        ExchangeAPI exchange = channel.exchange();
        return package_all_.toFunctorOps(exchange.declare(topicDefinition.exchange(), Domains$package$ExchangeType$.MODULE$.Topic(), exchange.declare$default$3(), exchange.declare$default$4(), exchange.declare$default$5(), exchange.declare$default$6(), exchange.declare$default$7(), exchange.declare$default$8()), genConcurrent).map(option -> {
            return new EventPublisher<F, T>(clock, genConcurrent, topicDefinition, channel) { // from class: lepus.std.EventChannel$$anon$1
                private final TopicDefinition topic$3;
                private final GenConcurrent evidence$1$3;
                private final Channel ch$3;
                private final Object currentTime;

                {
                    this.topic$3 = topicDefinition;
                    this.evidence$1$3 = genConcurrent;
                    this.ch$3 = channel;
                    this.currentTime = package$all$.MODULE$.toFunctorOps(Clock$.MODULE$.apply(clock).realTime(), genConcurrent).map(EventChannel$::lepus$std$EventChannel$$anon$1$$_$$lessinit$greater$$anonfun$1);
                }

                @Override // lepus.std.EventPublisher
                public Object publish(String str, Object obj) {
                    Tuple2 tuple2;
                    Left left = (Either) SemigroupalOps2$.MODULE$.product$extension((Either) package$all$.MODULE$.catsSyntaxSemigroupalOps2(this.topic$3.codec().encode((ChannelCodec<T>) obj)), EitherOps$.MODULE$.leftMap$extension(package$all$.MODULE$.catsSyntaxEither((Either) TopicNameEncoder$.MODULE$.get$extension(this.topic$3.topic()).apply(obj)), EventChannel$::lepus$std$EventChannel$$anon$1$$_$publish$$anonfun$1), Semigroupal$.MODULE$.catsSemigroupalForEither());
                    if (left instanceof Left) {
                        return ApplicativeErrorIdOps$.MODULE$.raiseError$extension((Throwable) package$all$.MODULE$.catsSyntaxApplicativeErrorId((Throwable) left.value()), this.evidence$1$3);
                    }
                    if (!(left instanceof Right) || (tuple2 = (Tuple2) ((Right) left).value()) == null) {
                        throw new MatchError(left);
                    }
                    Message message = (Message) tuple2._1();
                    String str2 = (String) tuple2._2();
                    return package$all$.MODULE$.toFlatMapOps(this.currentTime, this.evidence$1$3).flatMap(obj2 -> {
                        return publish$$anonfun$2(message, str, str2, BoxesRunTime.unboxToLong(obj2));
                    });
                }

                private final /* synthetic */ Object publish$$anonfun$2(Message message, String str, String str2, long j) {
                    return this.ch$3.messaging().publishRaw(MessageTypes$package$.MODULE$.EnvelopeRaw().apply(this.topic$3.exchange(), str2, false, message.withMessageId(str).withTimestamp(j)));
                }
            };
        });
    }

    public <F, T> Object consumer(TopicDefinition<T> topicDefinition, Option<String> option, Seq<String> seq, Channel<F, NormalMessagingChannel<F>> channel, GenConcurrent<F, Throwable> genConcurrent) {
        package$all$ package_all_ = package$all$.MODULE$;
        ExchangeAPI exchange = channel.exchange();
        return package_all_.toFlatMapOps(exchange.declare(topicDefinition.exchange(), Domains$package$ExchangeType$.MODULE$.Topic(), exchange.declare$default$3(), exchange.declare$default$4(), exchange.declare$default$5(), exchange.declare$default$6(), exchange.declare$default$7(), exchange.declare$default$8()), genConcurrent).flatMap(option2 -> {
            Object as;
            package$all$ package_all_2 = package$all$.MODULE$;
            package$all$ package_all_3 = package$all$.MODULE$;
            if (None$.MODULE$.equals(option)) {
                package$all$ package_all_4 = package$all$.MODULE$;
                package$all$ package_all_5 = package$all$.MODULE$;
                QueueAPI queue = channel.queue();
                as = package_all_4.toFunctorOps(package_all_5.toFlatMapOps(queue.declare(Domains$package$QueueName$.MODULE$.autoGen(), queue.declare$default$2(), queue.declare$default$3(), true, queue.declare$default$5(), queue.declare$default$6(), queue.declare$default$7()), genConcurrent).flatMap(option2 -> {
                    return genConcurrent.fromOption(option2, EventChannel$::consumer$$anonfun$1$$anonfun$1$$anonfun$1);
                }), genConcurrent).map(declareOk -> {
                    return declareOk.queue();
                });
            } else {
                if (!(option instanceof Some)) {
                    throw new MatchError(option);
                }
                String str = (String) ((Some) option).value();
                package$all$ package_all_6 = package$all$.MODULE$;
                QueueAPI queue2 = channel.queue();
                as = package_all_6.toFunctorOps(queue2.declare(str, queue2.declare$default$2(), true, queue2.declare$default$4(), queue2.declare$default$5(), queue2.declare$default$6(), queue2.declare$default$7()), genConcurrent).as(str);
            }
            return package_all_2.toFlatMapOps(package_all_3.toFunctorOps(as, genConcurrent).map(str2 -> {
                Seq seq2;
                if (seq.isEmpty()) {
                    TopicSelector$package$ topicSelector$package$ = TopicSelector$package$.MODULE$;
                    TopicSelector$package$TopicSelector$ topicSelector$package$TopicSelector$ = new Serializable() { // from class: lepus.std.TopicSelector$package$TopicSelector$
                        private static final Regex validPattern = StringOps$.MODULE$.r$extension(Predef$.MODULE$.augmentString("^(?:(?:[A-Za-z0-9]+|[*#])\\.)*(?:[A-Za-z0-9]+|[*#])?$"));

                        private Object writeReplace() {
                            return new ModuleSerializationProxy(TopicSelector$package$TopicSelector$.class);
                        }

                        /* JADX WARN: Unreachable blocks removed: 1, instructions: 1 */
                        private final Expr<String> build(Expr<String> expr, Quotes quotes) {
                            Some value = quotes.value(expr, FromExpr$.MODULE$.StringFromExpr());
                            if (value instanceof Some) {
                                return (Expr) from((String) value.value()).fold((v1) -> {
                                    return TopicSelector$package$.lepus$std$TopicSelector$package$TopicSelector$$$_$build$$anonfun$1(r1, v1);
                                }, (v1) -> {
                                    return TopicSelector$package$.lepus$std$TopicSelector$package$TopicSelector$$$_$build$$anonfun$2(r2, v1);
                                });
                            }
                            if (None$.MODULE$.equals(value)) {
                                throw quotes.reflect().report().errorAndAbort("Not a literal value!");
                            }
                            throw new MatchError(value);
                        }

                        public Either<String, String> from(String str2) {
                            return validPattern.matches(str2) ? Domains$package$ShortString$.MODULE$.from(str2) : package$.MODULE$.Left().apply(new StringBuilder(26).append("Invalid topic selector! `").append(str2).append("`").toString());
                        }

                        public String exact(String str2) {
                            return str2;
                        }

                        public final Expr<String> inline$build(Expr<String> expr, Quotes quotes) {
                            return build(expr, quotes);
                        }
                    };
                    seq2 = (Seq) new $colon.colon("#", Nil$.MODULE$);
                } else {
                    seq2 = seq;
                }
                return Tuple2$.MODULE$.apply(str2, seq2);
            }), genConcurrent).flatMap(tuple2 -> {
                if (tuple2 == null) {
                    throw new MatchError(tuple2);
                }
                String str3 = (String) tuple2._1();
                return package$all$.MODULE$.toFunctorOps(package$all$.MODULE$.toTraverseOps(((Seq) tuple2._2()).toList(), UnorderedFoldable$.MODULE$.catsTraverseForList()).traverse(str4 -> {
                    QueueAPI queue3 = channel.queue();
                    return queue3.bind(str3, topicDefinition.exchange(), str4, queue3.bind$default$4(), queue3.bind$default$5());
                }, genConcurrent), genConcurrent).map(list -> {
                    return new EventConsumer<F, T>(channel, str3, topicDefinition) { // from class: lepus.std.EventChannel$$anon$2
                        private final Channel ch$8;
                        private final String q$3;
                        private final TopicDefinition topic$8;

                        {
                            this.ch$8 = channel;
                            this.q$3 = str3;
                            this.topic$8 = topicDefinition;
                        }

                        @Override // lepus.std.EventConsumer
                        public Stream events() {
                            NormalMessagingChannel messaging = this.ch$8.messaging();
                            return messaging.consumeRaw(this.q$3, messaging.consumeRaw$default$2(), false, messaging.consumeRaw$default$4(), messaging.consumeRaw$default$5(), messaging.consumeRaw$default$6()).flatMap(deliveredMessage -> {
                                Left decode = this.topic$8.codec().decode(deliveredMessage.message());
                                if (decode instanceof Left) {
                                    return Stream$.MODULE$.exec(reject(deliveredMessage.deliveryTag()));
                                }
                                if (!(decode instanceof Right)) {
                                    throw new MatchError(decode);
                                }
                                Message message = (Message) ((Right) decode).value();
                                return (Stream) message.properties().messageId().flatMap((v2) -> {
                                    return EventChannel$.lepus$std$EventChannel$$anon$2$$_$_$$anonfun$1(r1, r2, v2);
                                }).fold(() -> {
                                    return r1.events$$anonfun$1$$anonfun$1(r2);
                                }, EventChannel$::lepus$std$EventChannel$$anon$2$$_$events$$anonfun$1$$anonfun$2);
                            }, NotGiven$.MODULE$.value());
                        }

                        @Override // lepus.std.EventConsumer
                        public Object processed(EventMessage eventMessage) {
                            NormalMessagingChannel messaging = this.ch$8.messaging();
                            return messaging.ack(eventMessage.tag(), messaging.ack$default$2());
                        }

                        @Override // lepus.std.EventConsumer
                        public Object reject(EventMessage eventMessage) {
                            return reject(eventMessage.tag());
                        }

                        private Object reject(long j) {
                            return this.ch$8.messaging().reject(j, false);
                        }

                        private final Stream events$$anonfun$1$$anonfun$1(DeliveredMessage deliveredMessage) {
                            return Stream$.MODULE$.exec(reject(deliveredMessage.deliveryTag()));
                        }
                    };
                });
            });
        });
    }

    public <F, T> Option<String> consumer$default$2() {
        return None$.MODULE$;
    }

    public <F, T> Object consumer(TopicDefinition<T> topicDefinition, Channel<F, NormalMessagingChannel<F>> channel, String str, Seq<String> seq, GenConcurrent<F, Throwable> genConcurrent) {
        return consumer(topicDefinition, (Option<String>) Some$.MODULE$.apply(str), seq, channel, genConcurrent);
    }

    public <F, T> Object consumer(TopicDefinition<T> topicDefinition, Channel<F, NormalMessagingChannel<F>> channel, Seq<String> seq, GenConcurrent<F, Throwable> genConcurrent) {
        return consumer(topicDefinition, (Option<String>) None$.MODULE$, seq, channel, genConcurrent);
    }

    public static final /* synthetic */ long lepus$std$EventChannel$$anon$1$$_$$lessinit$greater$$anonfun$1(FiniteDuration finiteDuration) {
        return Domains$package$Timestamp$.MODULE$.apply(finiteDuration.toMillis());
    }

    public static final /* synthetic */ EventChannel.InvalidTopicName lepus$std$EventChannel$$anon$1$$_$publish$$anonfun$1(String str) {
        return EventChannel$InvalidTopicName$.MODULE$.apply(str);
    }

    private static final Throwable consumer$$anonfun$1$$anonfun$1$$anonfun$1() {
        return new UnknownError("Must respond with Queue name");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final /* synthetic */ EventMessage $anonfun$1$$anonfun$1(String str, Message message, DeliveredMessage deliveredMessage, long j) {
        return EventMessage$.MODULE$.apply(str, Domains$package$Timestamp$.MODULE$.toInstant(j), message.payload(), deliveredMessage.deliveryTag());
    }

    public static final /* synthetic */ Option lepus$std$EventChannel$$anon$2$$_$_$$anonfun$1(Message message, DeliveredMessage deliveredMessage, String str) {
        return message.properties().timestamp().map(obj -> {
            return $anonfun$1$$anonfun$1(str, message, deliveredMessage, BoxesRunTime.unboxToLong(obj));
        });
    }

    public static final /* synthetic */ Stream lepus$std$EventChannel$$anon$2$$_$events$$anonfun$1$$anonfun$2(EventMessage eventMessage) {
        return Stream$.MODULE$.emit(eventMessage);
    }
}
