package org.aiflow.notification.client;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.aiflow.notification.entity.EventMeta;
import org.aiflow.notification.proto.NotificationServiceGrpc;
import org.apache.commons.collections4.CollectionUtils;

/* loaded from: input_file:org/aiflow/notification/client/EventListener.class */
public class EventListener {
    private final NotificationServiceGrpc.NotificationServiceBlockingStub serviceStub;
    private final List<String> keys;
    private final long version;
    private final String eventType;
    private final long startTime;
    private final String namespace;
    private final String sender;
    private final EventWatcher watcher;
    private final int timeoutSeconds;
    private volatile boolean isRunning = true;
    private final ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("listen-notification-%d").build());

    public EventListener(NotificationServiceGrpc.NotificationServiceBlockingStub notificationServiceBlockingStub, List<String> list, long j, String str, long j2, String str2, String str3, EventWatcher eventWatcher, Integer num) {
        this.serviceStub = notificationServiceBlockingStub;
        this.keys = list;
        this.version = j;
        this.eventType = str;
        this.startTime = j2;
        this.namespace = str2;
        this.sender = str3;
        this.watcher = eventWatcher;
        this.timeoutSeconds = num.intValue();
    }

    public void start() {
        this.executorService.submit(listenEvents());
    }

    public void shutdown() {
        this.isRunning = false;
        this.executorService.shutdown();
    }

    public Runnable listenEvents() {
        return () -> {
            long j = this.version;
            while (this.isRunning) {
                try {
                    if (Thread.currentThread().isInterrupted()) {
                        return;
                    }
                    List<EventMeta> listEvents = NotificationClient.listEvents(this.serviceStub, this.namespace, this.sender, this.keys, j, this.eventType, this.startTime, Integer.valueOf(this.timeoutSeconds));
                    if (CollectionUtils.isNotEmpty(listEvents)) {
                        this.watcher.process(listEvents);
                        j = listEvents.get(listEvents.size() - 1).getVersion();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    throw new RuntimeException("Error while listening notification", e);
                }
            }
        };
    }
}
