package org.hertsstack.core.service;

import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.lang.reflect.Proxy;
import java.util.Collections;
import java.util.logging.Logger;
import org.hertsstack.broker.ReactiveBroker;
import org.hertsstack.broker.ReactiveStreamingCache;
import org.hertsstack.broker.ReactiveStreamingCacheImpl;
import org.hertsstack.core.context.SharedServiceContext;

/* loaded from: input_file:org/hertsstack/core/service/HertsBroadCasterImpl.class */
class HertsBroadCasterImpl implements HertsBroadCaster {
    private static final Logger logger = Logger.getLogger(HertsBroadCasterImpl.class.getName());
    private final ReactiveStreamingCache<HertsReceiver> reactiveStreamingCache = ReactiveStreamingCacheImpl.getInstance();
    private ReactiveBroker broker;
    private Class<?> service;
    private Class<?> receiver;

    @Override // org.hertsstack.core.service.HertsBroadCaster
    public <K> K broadcast(String str) {
        return (K) createReceiver(str);
    }

    @Override // org.hertsstack.core.service.HertsBroadCaster
    public void registerReceiver(StreamObserver<Object> streamObserver) {
        ((ServerCallStreamObserver) streamObserver).setOnCancelHandler(() -> {
            logger.info("Cancelled internal receiver of Herts");
        });
        String str = (String) SharedServiceContext.Header.HERTS_CONNECTION_ID_CTX.get();
        this.reactiveStreamingCache.setClientId(str);
        this.reactiveStreamingCache.setObserver(str, streamObserver);
        streamObserver.onNext(Collections.singletonList(SharedServiceContext.Rpc.REGISTERED_METHOD_NAME));
        createReceiver(str);
    }

    @Override // org.hertsstack.core.service.HertsBroadCaster
    public String getClientId() {
        return (String) SharedServiceContext.Header.HERTS_CONNECTION_ID_CTX.get();
    }

    @Override // org.hertsstack.core.service.HertsBroadCaster
    public void setService(Class<?> cls) {
        this.service = cls;
    }

    @Override // org.hertsstack.core.service.HertsBroadCaster
    public void setReceiver(Class<?> cls) {
        this.receiver = cls;
    }

    @Override // org.hertsstack.core.service.HertsBroadCaster
    public void setBroker(ReactiveBroker reactiveBroker) {
        this.broker = reactiveBroker;
    }

    @Override // org.hertsstack.core.service.HertsBroadCaster
    public Class<?> getService() {
        return this.service;
    }

    @Override // org.hertsstack.core.service.HertsBroadCaster
    public Class<?> getReceiver() {
        return this.receiver;
    }

    private HertsReceiver createReceiver(String str) {
        HertsReceiver hertsReceiver = (HertsReceiver) this.reactiveStreamingCache.getHertsReceiver(str);
        if (hertsReceiver != null) {
            return hertsReceiver;
        }
        try {
            HertsReceiver hertsReceiver2 = (HertsReceiver) Proxy.newProxyInstance(this.receiver.getClassLoader(), new Class[]{this.receiver}, new HertsReactiveStreamingInvoker(this.broker, str));
            this.reactiveStreamingCache.setHertsReceiver(str, hertsReceiver2);
            return hertsReceiver2;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
}
