package polynote.server;

import izumi.reflect.Tag$;
import izumi.reflect.macrortti.LightTypeTag$;
import java.util.concurrent.atomic.AtomicInteger;
import polynote.env.ops.Location;
import polynote.kernel.environment.Env$;
import polynote.kernel.environment.Env$AddManaged$;
import polynote.kernel.logging.package$Logging$;
import polynote.kernel.package$StreamingHandles$;
import polynote.kernel.util.Publish;
import polynote.kernel.util.Publish$;
import polynote.messages.Error;
import polynote.messages.Message;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Predef$DummyImplicit$;
import scala.Tuple2;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import uzhttp.HTTPError;
import uzhttp.websocket.Frame;
import zio.CanFail$;
import zio.Has;
import zio.NeedsEnv$;
import zio.Promise;
import zio.Promise$;
import zio.ZIO;
import zio.ZIO$;
import zio.ZManaged;
import zio.ZManaged$;
import zio.ZQueue;
import zio.ZQueue$;
import zio.blocking.package;
import zio.stream.Take;
import zio.stream.Take$;
import zio.stream.ZStream;
import zio.stream.ZStream$;

/* compiled from: NotebookSession.scala */
/* loaded from: input_file:polynote/server/NotebookSession$.class */
public final class NotebookSession$ {
    public static NotebookSession$ MODULE$;
    private final AtomicInteger sessionId;

    static {
        new NotebookSession$();
    }

    public ZManaged<Has<package.Blocking.Service>, HTTPError, ZStream<Object, Nothing$, Frame>> stream(String str, ZStream<Object, Throwable, Frame> zStream, Publish<Object, Nothing$, Message> publish) {
        return package$NotebookManager$.MODULE$.assertValidPath(str).toManaged_().flatMap(boxedUnit -> {
            return ZQueue$.MODULE$.unbounded().toManaged_().flatMap(zQueue -> {
                return Env$AddManaged$.MODULE$.flatMap$extension(Env$.MODULE$.addManaged().apply(Publish$.MODULE$.apply(zQueue, Predef$DummyImplicit$.MODULE$.dummyImplicit())), publish2 -> {
                    return package$NotebookManager$.MODULE$.subscribe(str).orElseFail(() -> {
                        return new HTTPError.NotFound(str);
                    }, CanFail$.MODULE$.canFail()).flatMap(kernelSubscriber -> {
                        return MODULE$.nextSessionId().toManaged_().flatMap(obj -> {
                            return $anonfun$stream$6(kernelSubscriber, publish, zQueue, zStream, BoxesRunTime.unboxToInt(obj));
                        });
                    });
                }, Tag$.MODULE$.apply(Has.class, LightTypeTag$.MODULE$.parse(-108628489, "\u0001��\u0007zio.Has\u0001��\u0001��\u001cpolynote.kernel.util.Publish\u0003��\u0004��\u0001\tscala.Any\u0001\u0001\u0001��\u0004��\u0001\rscala.Nothing\u0001\u0001\u0002��\u0004��\u0001\u0019polynote.messages.Message\u0001\u0001\u0001\u0001��\u0001", "��\u0001\u0001��\u0007zio.Has\u0001��\u0001��\u001cpolynote.kernel.util.Publish\u0003��\u0004��\u0001\tscala.Any\u0001\u0001\u0001��\u0004��\u0001\rscala.Nothing\u0001\u0001\u0002��\u0004��\u0001\u0019polynote.messages.Message\u0001\u0001\u0001\u0001��\u0001\u0002\u0004��\u0001\u0014java.io.Serializable\u0001\u0001\u0004��\u0001\u0012scala.Serializable\u0001\u0001\u0001��\u0001\u0090\u0002\u0001\u0001\u0002��\u0001\u0090\u0007\u0001\u0001��\u0001\u0090\b\u0001\u0001", 11)), Tag$.MODULE$.apply(Publish.class, LightTypeTag$.MODULE$.parse(330561796, "\u0001��\u001cpolynote.kernel.util.Publish\u0003��\u0004��\u0001\tscala.Any\u0001\u0001\u0001��\u0004��\u0001\rscala.Nothing\u0001\u0001\u0002��\u0004��\u0001\u0019polynote.messages.Message\u0001\u0001\u0001\u0001", "������", 11)));
            });
        }).catchAll(th -> {
            ZManaged $times$greater;
            if (th instanceof HTTPError) {
                HTTPError hTTPError = (HTTPError) th;
                $times$greater = ZManaged$.MODULE$.fail(() -> {
                    return hTTPError;
                });
            } else {
                $times$greater = package$Logging$.MODULE$.error(th, new Location("NotebookSession.scala", 198, "stream", "polynote.server.NotebookSession")).toManaged_().$times$greater(ZManaged$.MODULE$.succeed(() -> {
                    return ZStream$.MODULE$.empty();
                }));
            }
            return $times$greater;
        }, CanFail$.MODULE$.canFail());
    }

    private AtomicInteger sessionId() {
        return this.sessionId;
    }

    public ZIO<Object, Nothing$, Object> nextSessionId() {
        return ZIO$.MODULE$.effectTotal(() -> {
            return MODULE$.sessionId().getAndIncrement();
        });
    }

    public static final /* synthetic */ ZManaged $anonfun$stream$6(KernelSubscriber kernelSubscriber, Publish publish, ZQueue zQueue, ZStream zStream, int i) {
        return package$StreamingHandles$.MODULE$.make(i).orDie(Predef$.MODULE$.$conforms(), CanFail$.MODULE$.canFail()).toManaged_().flatMap(service -> {
            return Promise$.MODULE$.make().toManaged_().map(promise -> {
                return new Tuple2(promise, new NotebookSession(kernelSubscriber, service, publish));
            }).flatMap(tuple2 -> {
                if (tuple2 == null) {
                    throw new MatchError(tuple2);
                }
                Promise promise2 = (Promise) tuple2._1();
                NotebookSession notebookSession = (NotebookSession) tuple2._2();
                return ZIO$.MODULE$.environment().toManaged_().flatMap(has -> {
                    return notebookSession.sendNotebook().toManaged_().map(boxedUnit -> {
                        return package$.MODULE$.parallelStreams(Predef$.MODULE$.wrapRefArray(new ZStream[]{package$.MODULE$.toFrames(ZStream$.MODULE$.fromQueue(zQueue, ZStream$.MODULE$.fromQueue$default$2()).flattenTake(Predef$.MODULE$.$conforms())), package$FrameStreamOps$.MODULE$.handleMessages$extension(package$.MODULE$.FrameStreamOps(zStream), package$.MODULE$.closeQueueIf(promise2, zQueue), message -> {
                            return ((ZIO) notebookSession.handleMessage().apply(message)).catchAll(th -> {
                                return package$Logging$.MODULE$.error(th, new Location("NotebookSession.scala", 189, "stream", "polynote.server.NotebookSession")).$times$greater(() -> {
                                    return zQueue.offer(new Take(Take$.MODULE$.single(new Error(0, th))));
                                });
                            }, CanFail$.MODULE$.canFail()).fork().as(() -> {
                                return None$.MODULE$;
                            });
                        }), package$.MODULE$.keepaliveStream(promise2)})).catchAll(th -> {
                            return ZStream$.MODULE$.fromEffect(package$Logging$.MODULE$.error("Notebook session is closing due to error", th, new Location("NotebookSession.scala", 194, "stream", "polynote.server.NotebookSession"))).drain();
                        }, CanFail$.MODULE$.canFail()).provide(has, NeedsEnv$.MODULE$.needsEnv());
                    });
                });
            });
        });
    }

    private NotebookSession$() {
        MODULE$ = this;
        this.sessionId = new AtomicInteger(0);
    }
}
