package cn.langpy.dblistener.core;

import cn.langpy.dblistener.core.model.DbType;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/* loaded from: input_file:cn/langpy/dblistener/core/ListenEngine.class */
public class ListenEngine implements Runnable {
    private static Logger log = Logger.getLogger(ListenEngine.class.toString());
    private Executor executor;
    private Integer parseQueueSize;
    private Integer maxBatchSize;
    private Integer parseThreadSize;
    private Integer processThreadSize;
    private DbType type;
    private String driver;
    private String host;
    private Integer port;
    private String username;
    private String password;
    private List<String> databases;
    private List<String> tables;
    private List<String> tablesExclude;
    private Boolean initialize;
    private List<EventListener> listeners = new ArrayList();
    final Map<DbType, String> connectorMap = new HashMap<DbType, String>() { // from class: cn.langpy.dblistener.core.ListenEngine.1
        {
            put(DbType.Mysql, "io.debezium.connector.mysql.MySqlConnector");
            put(DbType.Postgres, "io.debezium.connector.postgresql.PostgresConnector");
        }
    };

    /* loaded from: input_file:cn/langpy/dblistener/core/ListenEngine$Builder.class */
    public static class Builder {
        private DbType type;
        private String driver;
        private String host;
        private Integer port;
        private String username;
        private String password;
        private List<String> databases;
        private List<String> tables;
        private List<String> tablesExclude;
        private Executor executor;
        private Integer parseQueueSize = 8192;
        private Integer maxBatchSize = 2048;
        private Integer parseThreadSize = 2;
        private Integer processThreadSize = 2;
        private List<EventListener> listeners = new ArrayList();
        private Boolean initialize = true;

        public Builder driver(String str) {
            this.driver = str;
            return this;
        }

        public Builder initialize(Boolean bool) {
            this.initialize = bool;
            return this;
        }

        public Builder executor(Executor executor) {
            this.executor = executor;
            return this;
        }

        public Builder tablesExclude(List<String> list) {
            this.tablesExclude = list;
            return this;
        }

        public Builder listener(EventListener eventListener) {
            this.listeners.add(eventListener);
            return this;
        }

        public Builder listeners(List<? extends EventListener> list) {
            this.listeners.addAll(list);
            return this;
        }

        public Builder parseQueueSize(Integer num) {
            this.parseQueueSize = num;
            return this;
        }

        public Builder maxBatchSize(Integer num) {
            this.maxBatchSize = num;
            return this;
        }

        public Builder parseThreadSize(Integer num) {
            this.parseThreadSize = num;
            return this;
        }

        public Builder processThreadSize(Integer num) {
            this.processThreadSize = num;
            return this;
        }

        public Builder type(DbType dbType) {
            this.type = dbType;
            return this;
        }

        public Builder host(String str) {
            this.host = str;
            return this;
        }

        public Builder port(Integer num) {
            this.port = num;
            return this;
        }

        public Builder username(String str) {
            this.username = str;
            return this;
        }

        public Builder password(String str) {
            this.password = str;
            return this;
        }

        public Builder databases(List<String> list) {
            this.databases = list;
            return this;
        }

        public Builder tables(List<String> list) {
            this.tables = list;
            return this;
        }

        public ListenEngine build() {
            return new ListenEngine(this);
        }
    }

    public ListenEngine(Builder builder) {
        this.parseQueueSize = builder.parseQueueSize;
        this.maxBatchSize = builder.maxBatchSize;
        this.parseThreadSize = builder.parseThreadSize;
        this.processThreadSize = builder.processThreadSize;
        this.type = builder.type;
        this.host = builder.host;
        this.port = builder.port;
        this.username = builder.username;
        this.password = builder.password;
        this.databases = builder.databases;
        this.tables = builder.tables;
        this.tablesExclude = builder.tablesExclude;
        this.listeners.addAll(builder.listeners);
        if (builder.executor == null) {
            this.executor = Executors.newFixedThreadPool(this.processThreadSize.intValue());
        }
        this.initialize = builder.initialize;
        this.driver = builder.driver;
    }

    public static Builder builder() {
        return new Builder();
    }

    public DebeziumEngine<ChangeEvent<String, String>> create(String str) {
        DebeziumEngine<ChangeEvent<String, String>> build = DebeziumEngine.create(Json.class).using(createConfig(str)).notifying(new ChangeConsumer(this.type, this.listeners, this.tablesExclude, this.executor)).using((z, str2, th) -> {
            log.severe("【DBListener】=>监听错误：" + th);
            log.severe("【DBListener】=>监听错误：" + str2);
        }).build();
        closeOnExit(build);
        return build;
    }

    private void closeOnExit(DebeziumEngine<ChangeEvent<String, String>> debeziumEngine) {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                debeziumEngine.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }));
    }

    @Override // java.lang.Runnable
    public void run() {
        Iterator<String> it = this.databases.iterator();
        while (it.hasNext()) {
            create(it.next()).run();
        }
    }

    public void start() {
        start(false);
    }

    public void start(boolean z) {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(this.databases.size());
        Iterator<String> it = this.databases.iterator();
        while (it.hasNext()) {
            DebeziumEngine<ChangeEvent<String, String>> create = create(it.next());
            newFixedThreadPool.execute(create);
            create.run();
        }
        if (z) {
            try {
                newFixedThreadPool.awaitTermination(2147483647L, TimeUnit.DAYS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public Properties createConfig(String str) {
        Properties properties = new Properties();
        properties.setProperty("snapshot.mode", this.initialize.booleanValue() ? "initial" : "no_data");
        properties.setProperty("record.processing.threads", this.parseThreadSize + "");
        properties.setProperty("snapshot.max.threads", this.parseThreadSize + "");
        properties.setProperty("record.processing.order", "ORDERED");
        properties.setProperty("max.queue.size", this.parseQueueSize + "");
        properties.setProperty("max.batch.size", this.maxBatchSize + "");
        properties.setProperty("snapshot.fetch.size", "1024");
        properties.setProperty("min.row.count.to.stream.results", "0");
        properties.setProperty("snapshot.locking.mode", "none");
        properties.setProperty("name", "huoyo-" + new Random().nextInt(9));
        properties.setProperty("metrics.group-prefix", "huoyo-metric-" + new Random().nextInt(9));
        properties.setProperty("connector.class", this.connectorMap.get(this.type));
        properties.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
        properties.setProperty("offset.storage.file.filename", "/path/to/storage/offsets.dat");
        properties.setProperty("offset.flush.interval.ms", "0");
        properties.setProperty("min.row.count.to.stream.results", "200");
        properties.setProperty("snapshot.tables.order.by.row.count", "ascending");
        properties.setProperty("table.ignore.builtin", "true");
        properties.setProperty("database.hostname", this.host);
        properties.setProperty("database.port", this.port + "");
        properties.setProperty("database.user", this.username);
        properties.setProperty("database.password", this.password);
        properties.setProperty("database.jdbc.driver", this.driver == null ? "com.mysql.cj.jdbc.Driver" : this.driver);
        properties.setProperty("database.server.id", "30438" + new Random().nextInt(9));
        properties.setProperty("database.server.name", "hserver" + new Random().nextInt(9));
        properties.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");
        properties.setProperty("database.history.file.filename", "/path/to/storage/dbhistory.dat");
        properties.setProperty("database.ssl.mode", "disabled");
        properties.setProperty("database.include.list", str);
        properties.setProperty("table.include.list", String.join(",", this.tables));
        properties.setProperty("include.schema.changes", "true");
        properties.setProperty("include.schema.comments", "true");
        properties.setProperty("bigint.unsigned.handling.mode", "long");
        properties.setProperty("decimal.handling.mode", "double");
        properties.setProperty("time.precision.mode", "adaptive_time_microseconds");
        properties.setProperty("binary.handling.mode", "bytes");
        properties.setProperty("topic.prefix", "huoyo-connector");
        properties.setProperty("schemas.enable", "true");
        properties.setProperty("schema.history.internal.store.only.captured.tables.ddl", "true");
        properties.setProperty("schema.history.internal.store.only.captured.databases.ddl", "true");
        properties.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
        properties.setProperty("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat");
        return properties;
    }
}
