package cn.weforward.data.mongodb.persister;

import cn.weforward.common.Nameable;
import cn.weforward.data.mongodb.util.MongodbUtil;
import com.mongodb.MongoCommandException;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/weforward/data/mongodb/persister/MongodbWatcher.class */
public class MongodbWatcher implements Runnable {
    private static final Logger _Logger = LoggerFactory.getLogger(MongodbWatcher.class);
    DocumentChange m_Change;
    Thread m_Thread;
    int m_ErrorNum;
    MongoCursor<ChangeStreamDocument<Document>> m_Cursor;

    /* loaded from: input_file:cn/weforward/data/mongodb/persister/MongodbWatcher$DocumentChange.class */
    public interface DocumentChange extends Nameable {
        void onChange(ChangeStreamDocument<Document> changeStreamDocument);

        MongoCollection<Document> getCollection();
    }

    public MongodbWatcher(DocumentChange documentChange) {
        this.m_Change = documentChange;
        start();
    }

    private void start() {
        if (null != this.m_Thread) {
            return;
        }
        this.m_Thread = new Thread(this, "mongodbwatcher-" + this.m_Change.getName());
        this.m_Thread.start();
    }

    @Override // java.lang.Runnable
    public void run() {
        while (null != this.m_Thread) {
            try {
                doLoop();
            } catch (InterruptedException e) {
                return;
            } catch (Throwable th) {
                Logger logger = _Logger;
                StringBuilder append = new StringBuilder().append("监控程序异常,");
                int i = this.m_ErrorNum + 1;
                this.m_ErrorNum = i;
                logger.error(append.append(i).append("秒后重试").toString(), th);
                synchronized (this) {
                    try {
                        wait(this.m_ErrorNum * MongodbUtil.MONGOCLIENT_DEFAULT_CONNECTTIMEOUT_MS);
                    } catch (InterruptedException e2) {
                        return;
                    }
                }
            }
        }
    }

    private void doLoop() throws InterruptedException {
        try {
            try {
                ChangeStreamIterable watch = this.m_Change.getCollection().watch();
                watch.fullDocument(FullDocument.UPDATE_LOOKUP);
                this.m_Cursor = watch.iterator();
                while (this.m_Cursor.hasNext()) {
                    try {
                        ChangeStreamDocument<Document> changeStreamDocument = (ChangeStreamDocument) this.m_Cursor.next();
                        if (null != this.m_Change) {
                            this.m_Change.onChange(changeStreamDocument);
                        }
                    } catch (Throwable th) {
                        _Logger.error("变化通知异常", th);
                    }
                }
            } catch (MongoCommandException e) {
                if (!e.getMessage().contains("The $changeStream stage is only supported on replica sets")) {
                    throw e;
                }
                _Logger.error("非副本集数据库无法支持变化监听,将导致[" + this.m_Change.getName() + "]Reload功能失效");
                throw new InterruptedException("无法支持变化监听,正常中止");
            } catch (RuntimeException e2) {
                throw e2;
            }
        } finally {
            if (null != this.m_Cursor) {
                this.m_Cursor.close();
                this.m_Cursor = null;
            }
        }
    }

    public void close() {
        this.m_Thread = null;
        if (null != this.m_Cursor) {
            this.m_Cursor.close();
        }
    }
}
