package io.debezium.connector.postgresql;

import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.function.BlockingConsumer;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.LoggingContext;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/postgresql/PostgresConnectorTask.class */
public class PostgresConnectorTask extends BaseSourceTask {
    private static final String CONTEXT_NAME = "postgres-connector-task";
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final AtomicBoolean running = new AtomicBoolean(false);
    private PostgresTaskContext taskContext;
    private RecordsProducer producer;
    private volatile long lastProcessedLsn;
    private ChangeEventQueue<ChangeEvent> changeEventQueue;

    public void start(Configuration configuration) {
        if (this.running.get()) {
            return;
        }
        PostgresConnectorConfig postgresConnectorConfig = new PostgresConnectorConfig(configuration);
        PostgresConnection postgresConnection = new PostgresConnection(postgresConnectorConfig.jdbcConfig());
        Throwable th = null;
        try {
            try {
                TypeRegistry typeRegistry = postgresConnection.getTypeRegistry();
                if (postgresConnection != null) {
                    if (0 != 0) {
                        try {
                            postgresConnection.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        postgresConnection.close();
                    }
                }
                TopicSelector<TableId> create = PostgresTopicSelector.create(postgresConnectorConfig);
                this.taskContext = new PostgresTaskContext(postgresConnectorConfig, new PostgresSchema(postgresConnectorConfig, typeRegistry, create), create);
                SourceInfo sourceInfo = new SourceInfo(postgresConnectorConfig.getLogicalName());
                Map<String, Object> offset = this.context.offsetStorageReader().offset(sourceInfo.partition());
                LoggingContext.PreviousContext configureLoggingContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
                try {
                    try {
                        PostgresConnection createConnection = this.taskContext.createConnection();
                        Throwable th3 = null;
                        try {
                            try {
                                this.logger.info(createConnection.serverInfo().toString());
                                if (createConnection != null) {
                                    if (0 != 0) {
                                        try {
                                            createConnection.close();
                                        } catch (Throwable th4) {
                                            th3.addSuppressed(th4);
                                        }
                                    } else {
                                        createConnection.close();
                                    }
                                }
                                if (offset == null) {
                                    this.logger.info("No previous offset found");
                                    if (postgresConnectorConfig.snapshotNeverAllowed()) {
                                        this.logger.info("Snapshots are not allowed as per configuration, starting streaming logical changes only");
                                        this.producer = new RecordsStreamProducer(this.taskContext, sourceInfo);
                                    } else {
                                        createSnapshotProducer(this.taskContext, sourceInfo, postgresConnectorConfig.initialOnlySnapshot());
                                    }
                                } else {
                                    sourceInfo.load(offset);
                                    this.logger.info("Found previous offset {}", sourceInfo);
                                    if (sourceInfo.isSnapshotInEffect()) {
                                        if (postgresConnectorConfig.snapshotNeverAllowed()) {
                                            throw new ConnectException("The connector previously stopped while taking a snapshot, but now the connector is configured to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed.");
                                        }
                                        this.logger.info("Found previous incomplete snapshot");
                                        createSnapshotProducer(this.taskContext, sourceInfo, postgresConnectorConfig.initialOnlySnapshot());
                                    } else if (postgresConnectorConfig.alwaysTakeSnapshot()) {
                                        this.logger.info("Taking a new snapshot as per configuration");
                                        this.producer = new RecordsSnapshotProducer(this.taskContext, sourceInfo, true);
                                    } else {
                                        this.logger.info("Previous snapshot has completed successfully, streaming logical changes from last known position");
                                        this.producer = new RecordsStreamProducer(this.taskContext, sourceInfo);
                                    }
                                }
                                this.changeEventQueue = new ChangeEventQueue.Builder().pollInterval(postgresConnectorConfig.getPollInterval()).maxBatchSize(postgresConnectorConfig.getMaxBatchSize()).maxQueueSize(postgresConnectorConfig.getMaxQueueSize()).loggingContextSupplier(() -> {
                                    return this.taskContext.configureLoggingContext(CONTEXT_NAME);
                                }).build();
                                RecordsProducer recordsProducer = this.producer;
                                ChangeEventQueue<ChangeEvent> changeEventQueue = this.changeEventQueue;
                                changeEventQueue.getClass();
                                BlockingConsumer<ChangeEvent> blockingConsumer = (v1) -> {
                                    r1.enqueue(v1);
                                };
                                ChangeEventQueue<ChangeEvent> changeEventQueue2 = this.changeEventQueue;
                                changeEventQueue2.getClass();
                                recordsProducer.start(blockingConsumer, changeEventQueue2::producerFailure);
                                this.running.compareAndSet(false, true);
                                configureLoggingContext.restore();
                            } finally {
                            }
                        } catch (Throwable th5) {
                            if (createConnection != null) {
                                if (th3 != null) {
                                    try {
                                        createConnection.close();
                                    } catch (Throwable th6) {
                                        th3.addSuppressed(th6);
                                    }
                                } else {
                                    createConnection.close();
                                }
                            }
                            throw th5;
                        }
                    } catch (SQLException e) {
                        throw new ConnectException(e);
                    }
                } catch (Throwable th7) {
                    configureLoggingContext.restore();
                    throw th7;
                }
            } finally {
            }
        } catch (Throwable th8) {
            if (postgresConnection != null) {
                if (th != null) {
                    try {
                        postgresConnection.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    postgresConnection.close();
                }
            }
            throw th8;
        }
    }

    private void createSnapshotProducer(PostgresTaskContext postgresTaskContext, SourceInfo sourceInfo, boolean z) {
        if (z) {
            this.logger.info("Taking only a snapshot of the DB without streaming any changes afterwards...");
            this.producer = new RecordsSnapshotProducer(postgresTaskContext, sourceInfo, false);
        } else {
            this.logger.info("Taking a new snapshot of the DB and streaming logical changes once the snapshot is finished...");
            this.producer = new RecordsSnapshotProducer(postgresTaskContext, sourceInfo, true);
        }
    }

    public void commit() throws InterruptedException {
        if (this.running.get()) {
            this.producer.commit(this.lastProcessedLsn);
        }
    }

    public List<SourceRecord> poll() throws InterruptedException {
        List poll = this.changeEventQueue.poll();
        if (poll.size() > 0) {
            int size = poll.size() - 1;
            while (true) {
                if (size < 0) {
                    break;
                }
                SourceRecord record = ((ChangeEvent) poll.get(size)).getRecord();
                if (((ChangeEvent) poll.get(size)).isLastOfLsn()) {
                    this.lastProcessedLsn = ((Long) record.sourceOffset().get(SourceInfo.LSN_KEY)).longValue();
                    break;
                }
                size--;
            }
        }
        return (List) poll.stream().map((v0) -> {
            return v0.getRecord();
        }).collect(Collectors.toList());
    }

    public void stop() {
        if (this.running.compareAndSet(true, false)) {
            this.producer.stop();
        }
    }

    public String version() {
        return Module.version();
    }

    protected Iterable<Field> getAllConfigurationFields() {
        return PostgresConnectorConfig.ALL_FIELDS;
    }
}
