package io.debezium.connector.mysql;

import io.debezium.connector.mysql.RecordMakers;
import io.debezium.function.Predicates;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

/* loaded from: input_file:io/debezium/connector/mysql/SnapshotReader.class */
public class SnapshotReader extends AbstractReader {
    private boolean minimalBlocking;
    private RecordRecorder recorder;
    private volatile Thread thread;
    private volatile Runnable onSuccessfulCompletion;

    /* loaded from: input_file:io/debezium/connector/mysql/SnapshotReader$RecordRecorder.class */
    protected interface RecordRecorder {
        void recordRow(RecordMakers.RecordsForTable recordsForTable, Object[] objArr, long j) throws InterruptedException;
    }

    public SnapshotReader(MySqlTaskContext mySqlTaskContext) {
        super(mySqlTaskContext);
        this.minimalBlocking = true;
        this.recorder = this::recordRowAsRead;
    }

    public SnapshotReader onSuccessfulCompletion(Runnable runnable) {
        this.onSuccessfulCompletion = runnable;
        return this;
    }

    public SnapshotReader useMinimalBlocking(boolean z) {
        this.minimalBlocking = z;
        return this;
    }

    public SnapshotReader generateReadEvents() {
        this.recorder = this::recordRowAsRead;
        return this;
    }

    public SnapshotReader generateInsertEvents() {
        this.recorder = this::recordRowAsInsert;
        return this;
    }

    @Override // io.debezium.connector.mysql.AbstractReader
    protected void doStart() {
        this.thread = new Thread(this::execute, "mysql-snapshot-" + this.context.serverName());
        this.thread.start();
    }

    @Override // io.debezium.connector.mysql.AbstractReader
    protected void doStop() {
        this.thread.interrupt();
    }

    @Override // io.debezium.connector.mysql.AbstractReader
    protected void doCleanup() {
        this.thread = null;
        this.logger.trace("Completed writing all snapshot records");
        try {
            if (this.onSuccessfulCompletion != null) {
                this.onSuccessfulCompletion.run();
            }
        } catch (Throwable th) {
            this.logger.error("Error calling completion function after completing snapshot");
        }
    }

    protected void execute() {
        this.context.configureLoggingContext(SourceInfo.SNAPSHOT_KEY);
        this.logger.info("Starting snapshot");
        AtomicReference atomicReference = new AtomicReference();
        JdbcConnection jdbc = this.context.jdbc();
        MySqlSchema dbSchema = this.context.dbSchema();
        Filters filters = dbSchema.filters();
        SourceInfo source = this.context.source();
        Clock clock = this.context.clock();
        long currentTimeInMillis = clock.currentTimeInMillis();
        try {
            this.logger.info("Step 0: disabling autocommit and enabling repeatable read transactions");
            jdbc.setAutoCommit(false);
            atomicReference.set("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");
            jdbc.execute(new String[]{(String) atomicReference.get()});
            this.logger.info("Step 1: start transaction with consistent snapshot");
            atomicReference.set("START TRANSACTION WITH CONSISTENT SNAPSHOT");
            jdbc.execute(new String[]{(String) atomicReference.get()});
            long currentTimeInMillis2 = clock.currentTimeInMillis();
            this.logger.info("Step 2: flush and obtain global read lock (preventing writes to database)");
            atomicReference.set("FLUSH TABLES WITH READ LOCK");
            jdbc.execute(new String[]{(String) atomicReference.get()});
            this.logger.info("Step 3: read binlog position of MySQL master");
            atomicReference.set("SHOW MASTER STATUS");
            jdbc.query((String) atomicReference.get(), resultSet -> {
                if (resultSet.next()) {
                    source.setBinlogFilename(resultSet.getString(1));
                    source.setBinlogPosition(resultSet.getLong(2));
                    source.setGtidSet(resultSet.getString(5));
                    source.startSnapshot();
                }
            });
            this.logger.info("Step 4: read list of available databases");
            ArrayList<String> arrayList = new ArrayList();
            atomicReference.set("SHOW DATABASES");
            jdbc.query((String) atomicReference.get(), resultSet2 -> {
                while (resultSet2.next()) {
                    arrayList.add(resultSet2.getString(1));
                }
            });
            this.logger.info("Step 5: read list of available tables in each database");
            ArrayList<TableId> arrayList2 = new ArrayList();
            HashMap hashMap = new HashMap();
            for (String str : arrayList) {
                atomicReference.set("SHOW TABLES IN " + str);
                jdbc.query((String) atomicReference.get(), resultSet3 -> {
                    while (resultSet3.next()) {
                        TableId tableId = new TableId(str, (String) null, resultSet3.getString(1));
                        if (filters.tableFilter().test(tableId)) {
                            arrayList2.add(tableId);
                            ((List) hashMap.computeIfAbsent(str, str2 -> {
                                return new ArrayList();
                            })).add(tableId);
                        }
                    }
                });
            }
            this.logger.info("Step 6: generating DROP and CREATE statements to reflect current database schemas");
            ArrayList arrayList3 = new ArrayList();
            HashSet hashSet = new HashSet(dbSchema.tables().tableIds());
            hashSet.addAll(arrayList2);
            hashSet.forEach(tableId -> {
                arrayList3.add("DROP TABLE IF EXISTS " + tableId);
            });
            Stream map = dbSchema.tables().tableIds().stream().map((v0) -> {
                return v0.catalog();
            });
            arrayList.getClass();
            map.filter(Predicates.not((v1) -> {
                return r1.contains(v1);
            })).forEach(str2 -> {
                arrayList3.add("DROP DATABASE IF EXISTS " + str2);
            });
            for (Map.Entry entry : hashMap.entrySet()) {
                String str3 = (String) entry.getKey();
                arrayList3.add("DROP DATABASE IF EXISTS " + str3);
                arrayList3.add("CREATE DATABASE " + str3);
                arrayList3.add("USE " + str3);
                Iterator it = ((List) entry.getValue()).iterator();
                while (it.hasNext()) {
                    atomicReference.set("SHOW CREATE TABLE " + ((TableId) it.next()));
                    jdbc.query((String) atomicReference.get(), resultSet4 -> {
                        if (resultSet4.next()) {
                            arrayList3.add(resultSet4.getString(2));
                        }
                    });
                }
            }
            this.logger.debug("Step 6b: applying DROP and CREATE statements to connector's table model");
            dbSchema.applyDdl(source, null, String.join(";" + System.lineSeparator(), arrayList3), this::enqueueSchemaChanges);
            this.context.makeRecord().regenerate();
            boolean z = false;
            if (this.minimalBlocking) {
                this.logger.info("Step 7: releasing global read lock to enable MySQL writes");
                atomicReference.set("UNLOCK TABLES");
                jdbc.execute(new String[]{(String) atomicReference.get()});
                z = true;
                this.logger.info("Writes to MySQL prevented for a total of {}", Strings.duration(clock.currentTimeInMillis() - currentTimeInMillis2));
            }
            this.logger.info("Step 8: scanning contents of {} tables", Integer.valueOf(arrayList2.size()));
            long currentTimeInMillis3 = clock.currentTimeInMillis();
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            int i = 0;
            for (TableId tableId2 : arrayList2) {
                long currentTimeInMillis4 = clock.currentTimeInMillis();
                i++;
                this.logger.debug("Step 8.{}: scanning table '{}'; {} tables remain", new Object[]{Integer.valueOf(i), tableId2, Integer.valueOf(arrayList2.size() - i)});
                atomicReference.set("SELECT * FROM " + tableId2);
                jdbc.query((String) atomicReference.get(), resultSet5 -> {
                    RecordMakers.RecordsForTable forTable = this.context.makeRecord().forTable(tableId2, (BitSet) null, sourceRecord -> {
                        super.enqueueRecord(sourceRecord);
                    });
                    if (forTable != null) {
                        try {
                            int size = dbSchema.tableFor(tableId2).columns().size();
                            Object[] objArr = new Object[size];
                            while (resultSet5.next()) {
                                int i2 = 0;
                                int i3 = 1;
                                while (i2 != size) {
                                    objArr[i2] = resultSet5.getObject(i3);
                                    i2++;
                                    i3++;
                                }
                                this.recorder.recordRow(forTable, objArr, currentTimeInMillis);
                            }
                        } catch (InterruptedException e) {
                            Thread.interrupted();
                            this.logger.info("Stopping the snapshot after thread interruption");
                            atomicBoolean.set(true);
                        }
                    }
                });
                if (atomicBoolean.get()) {
                    break;
                }
                this.logger.info("Step 8.{}: scanned table '{}' in {}", new Object[]{Integer.valueOf(i), tableId2, Strings.duration(clock.currentTimeInMillis() - currentTimeInMillis4)});
            }
            this.logger.info("Step 8: scanned contents of {} tables in {}", Integer.valueOf(arrayList2.size()), Strings.duration(clock.currentTimeInMillis() - currentTimeInMillis3));
            if (!z) {
                this.logger.info("Step 9: releasing global read lock to enable MySQL writes");
                atomicReference.set("UNLOCK TABLES");
                jdbc.execute(new String[]{(String) atomicReference.get()});
                this.logger.info("Writes to MySQL prevented for a total of {}", Strings.duration(clock.currentTimeInMillis() - currentTimeInMillis2));
            }
            if (atomicBoolean.get()) {
                this.logger.info("Step 10: rolling back transaction after request to stop");
                atomicReference.set("ROLLBACK");
                jdbc.execute(new String[]{(String) atomicReference.get()});
                return;
            }
            this.logger.info("Step 10: committing transaction");
            atomicReference.set("COMMIT");
            jdbc.execute(new String[]{(String) atomicReference.get()});
            try {
                this.logger.info("Step 11: recording completion of snapshot");
                source.completeSnapshot();
                dbSchema.applyDdl(source, null, "", this::enqueueSchemaChanges);
                super.completeSuccessfully();
                this.logger.info("Completed snapshot in {}", Strings.duration(clock.currentTimeInMillis() - currentTimeInMillis));
            } catch (Throwable th) {
                super.completeSuccessfully();
                this.logger.info("Completed snapshot in {}", Strings.duration(clock.currentTimeInMillis() - currentTimeInMillis));
                throw th;
            }
        } catch (Throwable th2) {
            failed(th2, "Aborting snapshot after running '" + ((String) atomicReference.get()) + "': " + th2.getMessage());
        }
    }

    protected void enqueueSchemaChanges(String str, String str2) {
        if (!this.context.includeSchemaChangeRecords() || this.context.makeRecord().schemaChanges(str, str2, sourceRecord -> {
            super.enqueueRecord(sourceRecord);
        }) <= 0) {
            return;
        }
        this.logger.debug("Recorded DDL statements for database '{}': {}", str, str2);
    }

    protected void recordRowAsRead(RecordMakers.RecordsForTable recordsForTable, Object[] objArr, long j) throws InterruptedException {
        recordsForTable.read(objArr, j);
    }

    protected void recordRowAsInsert(RecordMakers.RecordsForTable recordsForTable, Object[] objArr, long j) throws InterruptedException {
        recordsForTable.create(objArr, j);
    }
}
