package lepus.std;

import cats.effect.kernel.GenConcurrent;
import cats.syntax.ApplicativeErrorIdOps$;
import cats.syntax.package$all$;
import fs2.Stream;
import fs2.Stream$;
import java.io.Serializable;
import lepus.client.Channel;
import lepus.client.DeliveredMessage;
import lepus.client.Message;
import lepus.client.MessageTypes$package$;
import lepus.client.QueueAPI;
import lepus.client.apis.NormalMessagingChannel;
import lepus.protocol.domains.Domains$package$ExchangeName$;
import scala.runtime.ModuleSerializationProxy;
import scala.util.NotGiven$;

/* compiled from: WorkPoolChannel.scala */
/* loaded from: input_file:lepus/std/WorkPoolChannel$.class */
public final class WorkPoolChannel$ implements Serializable {
    public static final WorkPoolChannel$ MODULE$ = new WorkPoolChannel$();

    private WorkPoolChannel$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(WorkPoolChannel$.class);
    }

    public <F, T> Object publisher(WorkPoolDefinition<T> workPoolDefinition, Channel<F, NormalMessagingChannel<F>> channel, GenConcurrent<F, Throwable> genConcurrent) {
        package$all$ package_all_ = package$all$.MODULE$;
        QueueAPI queue = channel.queue();
        return package_all_.toFunctorOps(queue.declare(workPoolDefinition.name(), queue.declare$default$2(), true, queue.declare$default$4(), queue.declare$default$5(), queue.declare$default$6(), queue.declare$default$7()), genConcurrent).map(option -> {
            return new WorkPoolServer<F, T>(workPoolDefinition, genConcurrent, channel) { // from class: lepus.std.WorkPoolChannel$$anon$1
                private final WorkPoolDefinition pool$2;
                private final GenConcurrent evidence$1$2;
                private final Channel ch$2;

                {
                    this.pool$2 = workPoolDefinition;
                    this.evidence$1$2 = genConcurrent;
                    this.ch$2 = channel;
                }

                @Override // lepus.std.WorkPoolServer
                public Object publish(Object obj) {
                    return this.pool$2.codec().encode((ChannelCodec) obj).fold(th -> {
                        return ApplicativeErrorIdOps$.MODULE$.raiseError$extension((Throwable) package$all$.MODULE$.catsSyntaxApplicativeErrorId(th), this.evidence$1$2);
                    }, message -> {
                        return this.ch$2.messaging().publishRaw(MessageTypes$package$.MODULE$.EnvelopeRaw().apply(Domains$package$ExchangeName$.MODULE$.default(), this.pool$2.name(), false, message));
                    });
                }
            };
        });
    }

    public <F, T> Object worker(WorkPoolDefinition<T> workPoolDefinition, Channel<F, NormalMessagingChannel<F>> channel, GenConcurrent<F, Throwable> genConcurrent) {
        package$all$ package_all_ = package$all$.MODULE$;
        QueueAPI queue = channel.queue();
        return package_all_.toFunctorOps(queue.declare(workPoolDefinition.name(), queue.declare$default$2(), true, queue.declare$default$4(), queue.declare$default$5(), queue.declare$default$6(), queue.declare$default$7()), genConcurrent).map(option -> {
            return new WorkPoolClient<F, T>(channel, workPoolDefinition) { // from class: lepus.std.WorkPoolChannel$$anon$2
                private final Channel ch$4;
                private final WorkPoolDefinition pool$4;

                {
                    this.ch$4 = channel;
                    this.pool$4 = workPoolDefinition;
                }

                @Override // lepus.std.WorkPoolClient
                public Stream jobs() {
                    NormalMessagingChannel messaging = this.ch$4.messaging();
                    return messaging.consumeRaw(this.pool$4.name(), messaging.consumeRaw$default$2(), false, messaging.consumeRaw$default$4(), messaging.consumeRaw$default$5(), messaging.consumeRaw$default$6()).flatMap(deliveredMessage -> {
                        return (Stream) this.pool$4.codec().decode(deliveredMessage.message()).fold(th -> {
                            return Stream$.MODULE$.exec(this.ch$4.messaging().reject(deliveredMessage.deliveryTag(), false));
                        }, (v1) -> {
                            return WorkPoolChannel$.lepus$std$WorkPoolChannel$$anon$2$$_$jobs$$anonfun$1$$anonfun$2(r2, v1);
                        });
                    }, NotGiven$.MODULE$.value());
                }

                @Override // lepus.std.WorkPoolClient
                public Object processed(Job job) {
                    NormalMessagingChannel messaging = this.ch$4.messaging();
                    return messaging.ack(job.tag(), messaging.ack$default$2());
                }
            };
        });
    }

    public static final /* synthetic */ Stream lepus$std$WorkPoolChannel$$anon$2$$_$jobs$$anonfun$1$$anonfun$2(DeliveredMessage deliveredMessage, Message message) {
        return Stream$.MODULE$.emit(Job$.MODULE$.apply(message.payload(), deliveredMessage.deliveryTag()));
    }
}
