package cn.weforward.data.mysql.util;

import cn.weforward.common.util.StringUtil;
import cn.weforward.data.mysql.EntityListener;
import cn.weforward.data.mysql.EntityWatcher;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/weforward/data/mysql/util/CanalWather.class */
public class CanalWather implements Runnable, EntityWatcher {
    protected static final Logger _Logger = LoggerFactory.getLogger(CanalWather.class);
    protected CanalConnector m_Connector;
    protected SocketAddress m_Address;
    protected String m_Username;
    protected String m_Password;
    protected String m_Destination;
    protected String m_Database;
    protected String m_TableName;
    protected String m_Filter = ".*\\..*";
    List<EntityListener> m_Listeners = new ArrayList();
    Thread m_Thread;
    int m_ErrorNum;

    public CanalWather(String str, int i, String str2, String str3, String str4) {
        this.m_Address = new InetSocketAddress(str, i);
        this.m_Destination = str2;
        this.m_Username = str3;
        this.m_Password = str4;
    }

    public void setDatabase(String str) {
        setFilter(str + "\\..*");
    }

    public void setFilter(String str) {
        this.m_Filter = str;
    }

    public void start() {
        if (null != this.m_Thread) {
            return;
        }
        this.m_Thread = new Thread(this, "canalwatcher-" + this.m_Database + "/" + this.m_TableName);
        this.m_Thread.start();
    }

    public void stop() {
        if (null != this.m_Connector) {
            this.m_Connector.stopRunning();
            this.m_Connector = null;
        }
        Thread thread = this.m_Thread;
        this.m_Thread = null;
        thread.interrupt();
    }

    @Override // java.lang.Runnable
    public void run() {
        while (null != this.m_Thread) {
            try {
                doLoop();
            } catch (InterruptedException e) {
                return;
            } catch (Throwable th) {
                Logger logger = _Logger;
                StringBuilder append = new StringBuilder().append("监控程序异常,");
                int i = this.m_ErrorNum + 1;
                this.m_ErrorNum = i;
                logger.error(append.append(i).append("秒后重试").toString(), th);
                synchronized (this) {
                    try {
                        wait(this.m_ErrorNum * 1000);
                    } catch (InterruptedException e2) {
                        return;
                    }
                }
            }
        }
    }

    private void doLoop() throws InterruptedException {
        while (null != this.m_Thread) {
            this.m_Connector = CanalConnectors.newSingleConnector(this.m_Address, this.m_Destination, this.m_Username, this.m_Password);
            try {
                try {
                    this.m_Connector.connect();
                    this.m_Connector.subscribe(this.m_Filter);
                    this.m_Connector.rollback();
                    while (null != this.m_Thread) {
                        for (CanalEntry.Entry entry : this.m_Connector.get(1, 10L, TimeUnit.SECONDS).getEntries()) {
                            if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
                                String schemaName = entry.getHeader().getSchemaName();
                                String tableName = entry.getHeader().getTableName();
                                try {
                                    CanalEntry.RowChange parseFrom = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                                    CanalEntry.EventType eventType = parseFrom.getEventType();
                                    if (eventType == CanalEntry.EventType.DELETE) {
                                        Iterator it = parseFrom.getRowDatasList().iterator();
                                        while (it.hasNext()) {
                                            onChange(schemaName, tableName, changeEntiy(3, ((CanalEntry.RowData) it.next()).getBeforeColumnsList()));
                                        }
                                    } else if (eventType == CanalEntry.EventType.INSERT) {
                                        Iterator it2 = parseFrom.getRowDatasList().iterator();
                                        while (it2.hasNext()) {
                                            onChange(schemaName, tableName, changeEntiy(1, ((CanalEntry.RowData) it2.next()).getAfterColumnsList()));
                                        }
                                    } else if (eventType == CanalEntry.EventType.UPDATE) {
                                        Iterator it3 = parseFrom.getRowDatasList().iterator();
                                        while (it3.hasNext()) {
                                            onChange(schemaName, tableName, changeEntiy(2, ((CanalEntry.RowData) it3.next()).getAfterColumnsList()));
                                        }
                                    }
                                } catch (Exception e) {
                                    _Logger.warn("忽略解析异常 data:" + entry.toString(), e);
                                }
                            }
                        }
                    }
                    this.m_Connector.disconnect();
                    this.m_Connector = null;
                } catch (Throwable th) {
                    _Logger.warn("监控异常", th);
                    this.m_Connector.disconnect();
                    this.m_Connector = null;
                }
            } catch (Throwable th2) {
                this.m_Connector.disconnect();
                this.m_Connector = null;
                throw th2;
            }
        }
    }

    private void onChange(String str, String str2, EntityListener.ChangeEntity changeEntity) {
        for (EntityListener entityListener : this.m_Listeners) {
            if (null != entityListener && (StringUtil.isEmpty(entityListener.getDatabase()) || StringUtil.eq(entityListener.getDatabase(), str))) {
                if (StringUtil.isEmpty(entityListener.getTabelName()) || StringUtil.eq(entityListener.getTabelName(), str2)) {
                    try {
                        entityListener.onChange(changeEntity);
                    } catch (Throwable th) {
                        _Logger.warn("忽略通知" + entityListener + "出错", th);
                    }
                }
            }
        }
    }

    private EntityListener.ChangeEntity changeEntiy(int i, List<CanalEntry.Column> list) {
        HashMap hashMap = new HashMap(list.size());
        for (CanalEntry.Column column : list) {
            hashMap.put(column.getName(), column.getValue());
        }
        return new EntityListener.ChangeEntity(i, hashMap);
    }

    @Override // cn.weforward.data.mysql.EntityWatcher
    public synchronized void register(EntityListener entityListener) {
        if (null == entityListener) {
            return;
        }
        List<EntityListener> list = this.m_Listeners;
        ArrayList arrayList = new ArrayList(list.size() + 1);
        for (EntityListener entityListener2 : list) {
            if (entityListener2.equals(entityListener)) {
                return;
            } else {
                arrayList.add(entityListener2);
            }
        }
        arrayList.add(entityListener);
        this.m_Listeners = arrayList;
        start();
    }

    @Override // cn.weforward.data.mysql.EntityWatcher
    public synchronized void unRegister(EntityListener entityListener) {
        if (null == entityListener) {
            return;
        }
        List<EntityListener> list = this.m_Listeners;
        ArrayList arrayList = new ArrayList(list.size() - 1);
        boolean z = false;
        for (EntityListener entityListener2 : list) {
            if (entityListener2.equals(entityListener)) {
                z = true;
            } else {
                arrayList.add(entityListener2);
            }
        }
        if (z) {
            this.m_Listeners = arrayList;
            if (arrayList.isEmpty()) {
                stop();
            }
        }
    }
}
