package org.impalaframework.extension.event;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;

/* loaded from: input_file:org/impalaframework/extension/event/BaseAsynchronousEventService.class */
public abstract class BaseAsynchronousEventService implements EventService, InitializingBean, DisposableBean {
    private static final Log logger = LogFactory.getLog(BaseAsynchronousEventService.class);
    private ScheduledExecutorService queueExecutorService;
    private EventListenerRegistry eventListenerRegistry;
    private EventTaskFactory eventTaskFactory;
    private static final int DEFAULT_POLL_INTERVAL = 1000;
    private static final int DEFAULT_DELAY = 1000;
    private Integer delayInMilliseconds;
    private Integer pollIntervalInMilliseconds;
    private PriorityBlockingQueue<Object> priorityEventQueue = new PriorityBlockingQueue<>();
    private AtomicBoolean started = new AtomicBoolean(false);

    @Override // org.impalaframework.extension.event.EventService
    public void submitEvent(Event event) {
        if (!this.started.get()) {
            throw new IllegalStateException("Cannot accept events as event manager has stopped");
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Adding event " + event + " to the event queue");
        }
        try {
            doSubmitEvent(event);
        } catch (ClassCastException e) {
            logger.error("Unexpected class cast exception.", e);
        }
    }

    protected void doSubmitEvent(Event event) {
        addEventToQueue(event);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void addEventToQueue(Event event) {
        this.priorityEventQueue.offer(event);
    }

    public final void start() {
        Assert.notNull(this.eventListenerRegistry, "eventListenerRegistry cannot be null");
        Assert.notNull(this.eventTaskFactory, "eventTaskFactory cannot be null");
        if (this.pollIntervalInMilliseconds == null) {
            this.pollIntervalInMilliseconds = 1000;
        }
        if (this.delayInMilliseconds == null) {
            this.delayInMilliseconds = 1000;
        }
        logger.info("Starting event manager, with single threaded queue executor and cached thread pool task executor");
        this.queueExecutorService = Executors.newSingleThreadScheduledExecutor();
        this.queueExecutorService.scheduleAtFixedRate(new Runnable() { // from class: org.impalaframework.extension.event.BaseAsynchronousEventService.1
            @Override // java.lang.Runnable
            public void run() {
                BaseAsynchronousEventService.this.consumeQueuedEvent();
            }
        }, this.delayInMilliseconds.intValue(), this.pollIntervalInMilliseconds.intValue(), TimeUnit.MILLISECONDS);
        this.started.set(true);
        logger.info("Finished starting event manager");
        afterStart();
    }

    public final void stop() {
        beforeStop();
        this.started.set(false);
        logger.info("Stopping event manager");
        if (!this.queueExecutorService.isShutdown()) {
            shutdown(this.queueExecutorService);
        }
        this.priorityEventQueue.clear();
        this.eventListenerRegistry.clear();
        logger.info("Finished stopping event manager");
    }

    protected void afterStart() {
    }

    protected void beforeStop() {
    }

    public void afterPropertiesSet() throws Exception {
        start();
    }

    public void destroy() throws Exception {
        stop();
    }

    void consumeQueuedEvent() {
        boolean z = true;
        while (z && this.started.get()) {
            try {
                Event event = (Event) this.priorityEventQueue.peek();
                boolean isDebugEnabled = logger.isDebugEnabled();
                if (event != null) {
                    if (isDebugEnabled) {
                        logger.debug("Removing event " + event + " from the event queue");
                    }
                    Date processedByDate = event.getProcessedByDate();
                    if (processedByDate.getTime() <= System.currentTimeMillis()) {
                        if (isDebugEnabled) {
                            logger.debug("Processing event: " + event);
                        }
                        this.priorityEventQueue.remove(event);
                        processQueuedEvent(event);
                    } else if (isDebugEnabled) {
                        System.out.println("Not processing event: still waiting as process by date is still in the future " + processedByDate);
                    }
                } else {
                    if (isDebugEnabled) {
                        logger.debug("No event found on queue");
                    }
                    z = false;
                }
            } catch (Exception e) {
                logger.error("Error processing queued event", e);
            }
        }
    }

    private void processQueuedEvent(Event event) {
        String type = event.getEventType().getType();
        List<EventListener> eventListeners = this.eventListenerRegistry.getEventListeners(type);
        if (logger.isDebugEnabled()) {
            logger.debug("Registered asynchronous listeners registered for type " + type + ": " + eventListeners);
        }
        if (eventListeners.size() <= 0) {
            if (logger.isDebugEnabled()) {
                logger.debug("No tasks submitted for event " + event + " as listener list is empty.");
                return;
            }
            return;
        }
        ArrayList arrayList = new ArrayList();
        for (EventListener eventListener : eventListeners) {
            String consumerName = eventListener.getConsumerName();
            try {
                if (logger.isDebugEnabled()) {
                    logger.debug("Creating event task for '" + consumerName + "' for event: " + event);
                }
                arrayList.add(newEventTask(event, eventListener));
            } catch (Exception e) {
                try {
                    onEventError(event, eventListener, e);
                } catch (Exception e2) {
                    logger.error("Event error logging failed: " + e2, e2);
                    logger.error("Original error: " + e.getMessage(), e);
                }
            }
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Submitting " + arrayList.size() + " tasks for event: " + event);
        }
        submitEventTasks(arrayList);
    }

    protected abstract void submitEventTasks(List<EventTask> list);

    /* JADX INFO: Access modifiers changed from: protected */
    public void shutdown(ExecutorService executorService) {
        try {
            executorService.shutdown();
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

    protected void onEventError(Event event, EventListener eventListener, Exception exc) {
        logger.error("Failed to successfully perform event processing using listener: " + eventListener.getConsumerName(), exc);
    }

    protected EventTask newEventTask(Event event, EventListener eventListener) {
        Assert.notNull(this.eventTaskFactory);
        return this.eventTaskFactory.newEventTask(event, eventListener);
    }

    public int getEventQueueSize() {
        return this.priorityEventQueue.size();
    }

    public boolean isActive() {
        return this.started.get() && !this.queueExecutorService.isShutdown();
    }

    public boolean isStopped() {
        return !this.started.get() && this.queueExecutorService.isShutdown();
    }

    public void setEventTaskFactory(EventTaskFactory eventTaskFactory) {
        this.eventTaskFactory = eventTaskFactory;
    }

    public void setEventListenerRegistry(EventListenerRegistry eventListenerRegistry) {
        this.eventListenerRegistry = eventListenerRegistry;
    }

    public void setPollIntervalInMilliseconds(Integer num) {
        this.pollIntervalInMilliseconds = num;
    }

    public void setDelayInMilliseconds(Integer num) {
        this.delayInMilliseconds = num;
    }
}
