package io.mantisrx.master.events;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.dispatch.BoundedMessageQueueSemantics;
import akka.dispatch.RequiresMessageQueue;
import com.google.common.collect.EvictingQueue;
import io.mantisrx.master.api.akka.route.proto.JobStatus;
import io.mantisrx.master.events.LifecycleEventsProto;
import io.mantisrx.master.jobcluster.job.JobState;
import io.mantisrx.server.core.Status;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/mantisrx/master/events/StatusEventBrokerActor.class */
public class StatusEventBrokerActor extends AbstractActor implements RequiresMessageQueue<BoundedMessageQueueSemantics> {
    private final ActorRef agentsErrorMonitorActorRef;
    public static final int MAX_STATUS_HISTORY_PER_JOB = 100;
    private final Logger logger = LoggerFactory.getLogger(StatusEventBrokerActor.class);
    private final Map<String, Set<ActorRef>> jobIdToActorMap = new HashMap();
    private final Map<ActorRef, String> actorToJobIdMap = new HashMap();
    private final Map<String, EvictingQueue<Status>> jobIdToStatusEventsBuf = new HashMap();

    /* loaded from: input_file:io/mantisrx/master/events/StatusEventBrokerActor$JobStatusRequest.class */
    public static class JobStatusRequest {
        private final String jobId;

        public JobStatusRequest(String str) {
            this.jobId = str;
        }

        public String getJobId() {
            return this.jobId;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return Objects.equals(this.jobId, ((JobStatusRequest) obj).jobId);
        }

        public int hashCode() {
            return Objects.hash(this.jobId);
        }

        public String toString() {
            return "JobStatusRequest{jobId='" + this.jobId + "'}";
        }
    }

    public static Props props(ActorRef actorRef) {
        return Props.create(StatusEventBrokerActor.class, new Object[]{actorRef});
    }

    public StatusEventBrokerActor(ActorRef actorRef) {
        this.agentsErrorMonitorActorRef = actorRef;
    }

    private void onJobStatusRequest(JobStatusRequest jobStatusRequest) {
        this.logger.debug("got request {}", jobStatusRequest);
        ActorRef sender = sender();
        this.jobIdToActorMap.computeIfAbsent(jobStatusRequest.jobId, str -> {
            return new HashSet();
        });
        this.jobIdToActorMap.get(jobStatusRequest.jobId).add(sender);
        this.actorToJobIdMap.put(sender, jobStatusRequest.jobId);
        getContext().watch(sender);
        EvictingQueue<Status> evictingQueue = this.jobIdToStatusEventsBuf.get(jobStatusRequest.jobId);
        if (evictingQueue != null) {
            evictingQueue.forEach(status -> {
                sender.tell(new JobStatus(status), ActorRef.noSender());
            });
        }
    }

    private void cleanupIfTerminalState(LifecycleEventsProto.StatusEvent statusEvent) {
        if (statusEvent instanceof LifecycleEventsProto.JobStatusEvent) {
            LifecycleEventsProto.JobStatusEvent jobStatusEvent = (LifecycleEventsProto.JobStatusEvent) statusEvent;
            if (JobState.isTerminalState(jobStatusEvent.getJobState())) {
                this.jobIdToStatusEventsBuf.remove(jobStatusEvent.getJobId());
            }
        }
    }

    private void onStatusEvent(LifecycleEventsProto.StatusEvent statusEvent) {
        Status from = LifecycleEventsProto.from(statusEvent);
        String jobId = from.getJobId();
        this.jobIdToStatusEventsBuf.computeIfAbsent(jobId, str -> {
            return EvictingQueue.create(100);
        }).add(from);
        cleanupIfTerminalState(statusEvent);
        Set<ActorRef> set = this.jobIdToActorMap.get(jobId);
        if (set == null || set.isEmpty()) {
            this.logger.debug("Job status dropped, no active subscribers for {}", jobId);
        } else {
            this.logger.debug("Sending job status {}", statusEvent);
            set.forEach(actorRef -> {
                actorRef.tell(new JobStatus(from), self());
            });
        }
        if (statusEvent instanceof LifecycleEventsProto.WorkerStatusEvent) {
            this.agentsErrorMonitorActorRef.tell(statusEvent, getSelf());
        }
    }

    private void onTerminated(Terminated terminated) {
        this.logger.info("actor terminated {}", terminated);
        ActorRef actor = terminated.actor();
        String str = this.actorToJobIdMap.get(actor);
        if (str != null) {
            this.jobIdToActorMap.get(str).remove(actor);
        }
        this.actorToJobIdMap.remove(actor);
    }

    public AbstractActor.Receive createReceive() {
        return receiveBuilder().match(JobStatusRequest.class, jobStatusRequest -> {
            onJobStatusRequest(jobStatusRequest);
        }).match(LifecycleEventsProto.StatusEvent.class, statusEvent -> {
            onStatusEvent(statusEvent);
        }).match(Terminated.class, terminated -> {
            onTerminated(terminated);
        }).build();
    }
}
