package io.debezium.connector.postgresql;

import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.data.Envelope;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.util.LoggingContext;
import io.debezium.util.Strings;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.postgresql.util.PGmoney;

@ThreadSafe
/* loaded from: input_file:io/debezium/connector/postgresql/RecordsSnapshotProducer.class */
public class RecordsSnapshotProducer extends RecordsProducer {
    private static final String CONTEXT_NAME = "records-snapshot-producer";
    private final ExecutorService executorService;
    private final Optional<RecordsStreamProducer> streamProducer;
    private AtomicReference<SourceRecord> currentRecord;
    static final /* synthetic */ boolean $assertionsDisabled;

    public RecordsSnapshotProducer(PostgresTaskContext postgresTaskContext, SourceInfo sourceInfo, boolean z) {
        super(postgresTaskContext, sourceInfo);
        this.executorService = Executors.newSingleThreadExecutor(runnable -> {
            return new Thread(runnable, "records-snapshot-producer-thread");
        });
        this.currentRecord = new AtomicReference<>();
        if (z) {
            this.streamProducer = Optional.of(new RecordsStreamProducer(postgresTaskContext, sourceInfo));
        } else {
            this.streamProducer = Optional.empty();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.postgresql.RecordsProducer
    public void start(Consumer<SourceRecord> consumer) {
        LoggingContext.PreviousContext configureLoggingContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
        try {
            CompletableFuture.runAsync(() -> {
                takeSnapshot(consumer);
            }, this.executorService).thenRun(() -> {
                startStreaming(consumer);
            }).exceptionally(this::handleException);
        } finally {
            configureLoggingContext.restore();
        }
    }

    private Void handleException(Throwable th) {
        this.logger.error("unexpected exception", th.getCause() != null ? th.getCause() : th);
        stop();
        return null;
    }

    private void startStreaming(Consumer<SourceRecord> consumer) {
        try {
            this.streamProducer.ifPresent(recordsStreamProducer -> {
                this.logger.info("Snapshot finished, continuing streaming changes from {}", ReplicationConnection.format(this.sourceInfo.lsn().longValue()));
                recordsStreamProducer.start(consumer);
            });
        } finally {
            cleanup();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.postgresql.RecordsProducer
    public void commit() {
        this.streamProducer.ifPresent((v0) -> {
            v0.commit();
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.postgresql.RecordsProducer
    public void stop() {
        try {
            this.streamProducer.ifPresent((v0) -> {
                v0.stop();
            });
        } finally {
            cleanup();
        }
    }

    private void cleanup() {
        this.currentRecord.set(null);
        this.executorService.shutdownNow();
    }

    private void takeSnapshot(Consumer<SourceRecord> consumer) {
        long currentTimeInMillis = clock().currentTimeInMillis();
        Connection connection = null;
        try {
            PostgresConnection createConnection = this.taskContext.createConnection();
            Throwable th = null;
            try {
                try {
                    connection = createConnection.connection();
                    String lineSeparator = System.lineSeparator();
                    this.logger.info("Step 0: disabling autocommit");
                    createConnection.setAutoCommit(false);
                    long snapshotLockTimeoutMillis = this.taskContext.config().snapshotLockTimeoutMillis();
                    this.logger.info("Step 1: starting transaction and refreshing the DB schemas for database '{}' and user '{}'", createConnection.database(), createConnection.username());
                    StringBuilder sb = new StringBuilder("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE;");
                    createConnection.executeWithoutCommitting(sb.toString());
                    sb.delete(0, sb.length());
                    PostgresSchema schema = schema();
                    schema.refresh(createConnection, false);
                    this.logger.info("Step 2: locking each of the database tables, waiting a maximum of '{}' seconds for each lock", Double.valueOf(snapshotLockTimeoutMillis / 1000.0d));
                    sb.append("SET lock_timeout = ").append(snapshotLockTimeoutMillis).append(";").append(lineSeparator);
                    schema.tables().forEach(tableId -> {
                        sb.append("LOCK TABLE ").append(tableId.toString()).append(" IN SHARE UPDATE EXCLUSIVE MODE;").append(lineSeparator);
                    });
                    createConnection.executeWithoutCommitting(sb.toString());
                    schema.refresh(createConnection, false);
                    long currentXLogLocation = createConnection.currentXLogLocation();
                    int intValue = createConnection.currentTransactionId().intValue();
                    this.logger.info("\t read xlogStart at '{}' from transaction '{}'", ReplicationConnection.format(currentXLogLocation), Integer.valueOf(intValue));
                    this.sourceInfo.startSnapshot();
                    this.sourceInfo.update(Long.valueOf(currentXLogLocation), Long.valueOf(clock().currentTimeInMicros()), Integer.valueOf(intValue));
                    this.logger.info("Step 3: reading and exporting the contents of each table");
                    AtomicInteger atomicInteger = new AtomicInteger(0);
                    schema.tables().forEach(tableId2 -> {
                        if (schema.isFilteredOut(tableId2)) {
                            this.logger.info("\t table '{}' is filtered out, ignoring", tableId2);
                            return;
                        }
                        long currentTimeInMillis2 = clock().currentTimeInMillis();
                        this.logger.info("\t exporting data from table '{}'", tableId2);
                        try {
                            createConnection.query("SELECT * FROM " + tableId2, this::readTableStatement, resultSet -> {
                                readTable(tableId2, resultSet, consumer, atomicInteger);
                            });
                            this.logger.info("\t finished exporting '{}' records for '{}'; total duration '{}'", new Object[]{Integer.valueOf(atomicInteger.get()), tableId2, Strings.duration(clock().currentTimeInMillis() - currentTimeInMillis2)});
                            atomicInteger.set(0);
                        } catch (SQLException e) {
                            throw new ConnectException(e);
                        }
                    });
                    this.logger.info("Step 4: committing transaction '{}'", Integer.valueOf(intValue));
                    connection.commit();
                    this.logger.info("Step 5: sending the last snapshot record");
                    SourceRecord sourceRecord = this.currentRecord.get();
                    if (sourceRecord != null) {
                        this.sourceInfo.markLastSnapshotRecord();
                        this.currentRecord.set(new SourceRecord(sourceRecord.sourcePartition(), this.sourceInfo.offset(), sourceRecord.topic(), sourceRecord.kafkaPartition(), sourceRecord.keySchema(), sourceRecord.key(), sourceRecord.valueSchema(), sourceRecord.value()));
                        sendCurrentRecord(consumer);
                    }
                    this.sourceInfo.completeSnapshot();
                    this.logger.info("Snapshot completed in '{}'", Strings.duration(clock().currentTimeInMillis() - currentTimeInMillis));
                    if (createConnection != null) {
                        if (0 != 0) {
                            try {
                                createConnection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createConnection.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (SQLException e) {
            if (connection != null) {
                try {
                    connection.rollback();
                } catch (SQLException e2) {
                    this.logger.error("Cannot rollback snapshot transaction", e2);
                    throw new ConnectException(e);
                }
            }
            throw new ConnectException(e);
        }
    }

    private Statement readTableStatement(Connection connection) throws SQLException {
        int rowsFetchSize = this.taskContext.config().rowsFetchSize();
        Statement createStatement = connection.createStatement();
        createStatement.setFetchSize(rowsFetchSize);
        return createStatement;
    }

    private void readTable(TableId tableId, ResultSet resultSet, Consumer<SourceRecord> consumer, AtomicInteger atomicInteger) throws SQLException {
        Table tableFor = schema().tableFor(tableId);
        if (!$assertionsDisabled && tableFor == null) {
            throw new AssertionError();
        }
        int size = tableFor.columns().size();
        Object[] objArr = new Object[size];
        ResultSetMetaData metaData = resultSet.getMetaData();
        while (resultSet.next()) {
            atomicInteger.incrementAndGet();
            sendCurrentRecord(consumer);
            int i = 0;
            int i2 = 1;
            while (i != size) {
                objArr[i] = valueForColumn(resultSet, i2, metaData);
                i++;
                i2++;
            }
            generateReadRecord(tableId, objArr);
        }
    }

    private Object valueForColumn(ResultSet resultSet, int i, ResultSetMetaData resultSetMetaData) throws SQLException {
        try {
            switch (PgOid.valueOf(resultSetMetaData.getColumnTypeName(i))) {
                case 790:
                    return Double.valueOf(new PGmoney(resultSet.getString(i)).val);
                case 1560:
                    return resultSet.getString(i);
                default:
                    return resultSet.getObject(i);
            }
        } catch (SQLException e) {
            return resultSet.getObject(i);
        }
    }

    protected void generateReadRecord(TableId tableId, Object[] objArr) {
        if (objArr.length == 0) {
            return;
        }
        TableSchema schemaFor = schema().schemaFor(tableId);
        if (!$assertionsDisabled && schemaFor == null) {
            throw new AssertionError();
        }
        Object keyFromColumnData = schemaFor.keyFromColumnData(objArr);
        Struct valueFromColumnData = schemaFor.valueFromColumnData(objArr);
        if (keyFromColumnData == null || valueFromColumnData == null) {
            return;
        }
        Schema keySchema = schemaFor.keySchema();
        this.sourceInfo.update(Long.valueOf(clock().currentTimeInMicros()));
        Map<String, String> partition = this.sourceInfo.partition();
        Map<String, ?> offset = this.sourceInfo.offset();
        String str = topicSelector().topicNameFor(tableId);
        Envelope createEnvelope = createEnvelope(schemaFor, str);
        this.currentRecord.set(new SourceRecord(partition, offset, str, (Integer) null, keySchema, keyFromColumnData, createEnvelope.schema(), createEnvelope.read(valueFromColumnData, this.sourceInfo.source(), Long.valueOf(clock().currentTimeInMillis()))));
    }

    private void sendCurrentRecord(Consumer<SourceRecord> consumer) {
        SourceRecord sourceRecord = this.currentRecord.get();
        if (sourceRecord == null) {
            return;
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("sending read event '{}'", sourceRecord);
        }
        consumer.accept(sourceRecord);
    }

    static {
        $assertionsDisabled = !RecordsSnapshotProducer.class.desiredAssertionStatus();
    }
}
