package io.vlingo.http.resource.sse;

import io.vlingo.actors.Actor;
import io.vlingo.actors.Definition;
import io.vlingo.actors.Stoppable;
import io.vlingo.actors.World;
import io.vlingo.common.Cancellable;
import io.vlingo.common.Scheduled;
import io.vlingo.http.Method;
import io.vlingo.http.Request;
import io.vlingo.http.RequestHeader;
import io.vlingo.http.Response;
import io.vlingo.http.ResponseHeader;
import io.vlingo.http.resource.ResourceHandler;
import io.vlingo.wire.channel.RequestResponseContext;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/* loaded from: input_file:io/vlingo/http/resource/sse/SseStreamResource.class */
public class SseStreamResource extends ResourceHandler {
    private static final Map<String, SsePublisher> publishers = new ConcurrentHashMap();
    private final World world;

    /* loaded from: input_file:io/vlingo/http/resource/sse/SseStreamResource$SsePublisherActor.class */
    public static class SsePublisherActor extends Actor implements SsePublisher, Scheduled<Object>, Stoppable {
        private final Cancellable cancellable;
        private final SseFeed feed;
        private final String streamName;
        private final Map<String, SseSubscriber> subscribers = new HashMap();

        public SsePublisherActor(String str, Class<? extends Actor> cls, int i, int i2, String str2) {
            this.streamName = str;
            this.feed = (SseFeed) stage().actorFor(SseFeed.class, Definition.has(cls, Definition.parameters(new Object[]{str, Integer.valueOf(i), str2})));
            this.cancellable = stage().scheduler().schedule((Scheduled) selfAs(Scheduled.class), (Object) null, 10L, i2);
            logger().log("SsePublisher started for: " + this.streamName);
        }

        @Override // io.vlingo.http.resource.sse.SsePublisher
        public void subscribe(SseSubscriber sseSubscriber) {
            this.subscribers.put(sseSubscriber.id(), sseSubscriber);
        }

        @Override // io.vlingo.http.resource.sse.SsePublisher
        public void unsubscribe(SseSubscriber sseSubscriber) {
            sseSubscriber.close();
            this.subscribers.remove(sseSubscriber.id());
        }

        public void intervalSignal(Scheduled<Object> scheduled, Object obj) {
            this.feed.to(this.subscribers.values());
        }

        public void stop() {
            this.cancellable.cancel();
            unsubscribeAll();
            super.stop();
        }

        private void unsubscribeAll() {
            Collection<SseSubscriber> values = this.subscribers.values();
            for (SseSubscriber sseSubscriber : (SseSubscriber[]) values.toArray(new SseSubscriber[values.size()])) {
                unsubscribe(sseSubscriber);
            }
        }
    }

    public SseStreamResource(World world) {
        this.world = world;
    }

    public void subscribeToStream(String str, Class<? extends Actor> cls, int i, int i2, String str2) {
        RequestResponseContext<?> clientContext = context().clientContext();
        clientContext.whenClosing(unsubscribeRequest());
        String headerValueOr = context().request().headerValueOr("X-Correlation-ID", "");
        publisherFor(str, cls, i, i2, str2).subscribe(new SseSubscriber(str, new SseClient(clientContext), headerValueOr, context().request().headerValueOr(RequestHeader.LastEventID, "")));
        completes().with(Response.of(Response.Status.Ok, ResponseHeader.headers(ResponseHeader.correlationId(headerValueOr))));
    }

    public void unsubscribeFromStream(String str, String str2) {
        SsePublisher ssePublisher = publishers.get(str);
        if (ssePublisher != null) {
            ssePublisher.unsubscribe(new SseSubscriber(str, new SseClient(context().clientContext())));
        }
        completes().with(Response.of(Response.Status.Ok));
    }

    private SsePublisher publisherFor(String str, Class<? extends Actor> cls, int i, int i2, String str2) {
        SsePublisher ssePublisher = publishers.get(str);
        if (ssePublisher == null) {
            ssePublisher = (SsePublisher) this.world.actorFor(SsePublisher.class, Definition.has(SsePublisherActor.class, Definition.parameters(new Object[]{str, cls, Integer.valueOf(i), Integer.valueOf(i2), str2})));
            SsePublisher putIfAbsent = publishers.putIfAbsent(str, ssePublisher);
            if (putIfAbsent != null) {
                ssePublisher.stop();
                ssePublisher = putIfAbsent;
            }
        }
        return ssePublisher;
    }

    private Request unsubscribeRequest() {
        try {
            return Request.has(Method.DELETE).and(new URI(context().request().uri.getPath() + "/" + context().clientContext().id()));
        } catch (Exception e) {
            return null;
        }
    }
}
