package org.onosproject.event.impl;

import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.SharedExecutors;
import org.onlab.util.Tools;
import org.onosproject.event.AbstractEvent;
import org.onosproject.event.DefaultEventSinkRegistry;
import org.onosproject.event.Event;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.event.EventSink;
import org.onosproject.security.AppGuard;
import org.onosproject.security.AppPermission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Service
@Component(immediate = true)
/* loaded from: input_file:org/onosproject/event/impl/CoreEventDispatcher.class */
public class CoreEventDispatcher extends DefaultEventSinkRegistry implements EventDeliveryService {
    private static final long DEFAULT_EXECUTE_MS = 5000;
    private static final long WATCHDOG_MS = 250;
    private static final Event KILL_PILL = new AbstractEvent(null, 0) { // from class: org.onosproject.event.impl.CoreEventDispatcher.1
    };
    private volatile DispatchLoop dispatchLoop;
    private TimerTask watchdog;
    private volatile EventSink lastSink;
    private volatile Future<?> dispatchFuture;
    private final Logger log = LoggerFactory.getLogger(getClass());
    private boolean executionTimeLimit = false;
    private final BlockingQueue<Event> events = new LinkedBlockingQueue();
    private final ExecutorService executor = Executors.newSingleThreadExecutor(Tools.groupedThreads("onos/event", "dispatch-%d", this.log));
    private long maxProcessMillis = DEFAULT_EXECUTE_MS;
    private final Stopwatch stopwatch = Stopwatch.createUnstarted();

    /* loaded from: input_file:org/onosproject/event/impl/CoreEventDispatcher$DispatchLoop.class */
    private class DispatchLoop implements Runnable {
        private volatile boolean stopped;

        private DispatchLoop() {
        }

        @Override // java.lang.Runnable
        public void run() {
            Event event;
            this.stopped = false;
            CoreEventDispatcher.this.log.info("Dispatch loop initiated");
            while (!this.stopped) {
                try {
                    try {
                        event = (Event) CoreEventDispatcher.this.events.take();
                    } catch (Error | Exception e) {
                        CoreEventDispatcher.this.log.warn("Error encountered while dispatching event:", e);
                    }
                } catch (InterruptedException e2) {
                    CoreEventDispatcher.this.log.warn("Dispatch loop interrupted");
                }
                if (event == CoreEventDispatcher.KILL_PILL) {
                    break;
                } else {
                    process(event);
                }
            }
            CoreEventDispatcher.this.log.info("Dispatch loop terminated");
        }

        private void process(Event event) {
            EventSink sink = CoreEventDispatcher.this.getSink(event.getClass());
            if (sink == null) {
                CoreEventDispatcher.this.log.warn("No sink registered for event class {}", event.getClass().getName());
                return;
            }
            CoreEventDispatcher.this.lastSink = sink;
            CoreEventDispatcher.this.stopwatch.start();
            sink.process(event);
            CoreEventDispatcher.this.stopwatch.reset();
        }

        void stop() {
            this.stopped = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/onosproject/event/impl/CoreEventDispatcher$Watchdog.class */
    public class Watchdog extends TimerTask {
        private Watchdog() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            long elapsed = CoreEventDispatcher.this.stopwatch.elapsed(TimeUnit.MILLISECONDS);
            if (elapsed > CoreEventDispatcher.this.maxProcessMillis) {
                CoreEventDispatcher.this.stopwatch.reset();
                CoreEventDispatcher.this.log.warn("Event sink {} exceeded execution time limit: {} ms; spawning new dispatch loop", CoreEventDispatcher.this.lastSink.getClass().getName(), Long.valueOf(elapsed));
                CoreEventDispatcher.this.lastSink.onProcessLimit();
                CoreEventDispatcher.this.dispatchLoop.stop();
                CoreEventDispatcher.this.dispatchLoop = new DispatchLoop();
                CoreEventDispatcher.this.dispatchFuture.cancel(true);
                CoreEventDispatcher.this.dispatchFuture = CoreEventDispatcher.this.executor.submit(CoreEventDispatcher.this.dispatchLoop);
            }
        }
    }

    public void post(Event event) {
        if (this.events.add(event)) {
            return;
        }
        this.log.error("Unable to post event {}", event);
    }

    @Activate
    public void activate() {
        this.dispatchLoop = new DispatchLoop();
        this.dispatchFuture = this.executor.submit(this.dispatchLoop);
        if (this.maxProcessMillis != 0) {
            startWatchdog();
        }
        this.log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        this.dispatchLoop.stop();
        stopWatchdog();
        post(KILL_PILL);
        this.log.info("Stopped");
    }

    private void startWatchdog() {
        this.log.info("Starting watchdog task");
        this.watchdog = new Watchdog();
        SharedExecutors.getTimer().schedule(this.watchdog, WATCHDOG_MS, WATCHDOG_MS);
    }

    private void stopWatchdog() {
        this.log.info("Stopping watchdog task");
        if (this.watchdog != null) {
            this.watchdog.cancel();
        }
    }

    public void setDispatchTimeLimit(long j) {
        AppGuard.checkPermission(AppPermission.Type.EVENT_WRITE);
        Preconditions.checkArgument(j == 0 || j >= WATCHDOG_MS, "Time limit must be greater than %s", new Object[]{Long.valueOf(WATCHDOG_MS)});
        long j2 = this.maxProcessMillis;
        this.maxProcessMillis = j;
        if (j == 0 && j2 != 0) {
            stopWatchdog();
        } else {
            if (j == 0 || j2 != 0) {
                return;
            }
            startWatchdog();
        }
    }

    public long getDispatchTimeLimit() {
        AppGuard.checkPermission(AppPermission.Type.EVENT_READ);
        return this.maxProcessMillis;
    }
}
