package io.debezium.connector.postgresql;

import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.connection.ReplicationStream;
import io.debezium.connector.postgresql.proto.PgProto;
import io.debezium.data.Envelope;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.util.LoggingContext;
import io.debezium.util.Strings;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.Charset;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.postgresql.geometric.PGpoint;
import org.postgresql.jdbc.PgArray;
import org.postgresql.jdbc.PgConnection;

@ThreadSafe
/* loaded from: input_file:io/debezium/connector/postgresql/RecordsStreamProducer.class */
public class RecordsStreamProducer extends RecordsProducer {
    private static final String CONTEXT_NAME = "records-stream-producer";
    private final ExecutorService executorService;
    private final ReplicationConnection replicationConnection;
    private final AtomicReference<ReplicationStream> replicationStream;
    private PgConnection typeResolverConnection;
    static final /* synthetic */ boolean $assertionsDisabled;

    public RecordsStreamProducer(PostgresTaskContext postgresTaskContext, SourceInfo sourceInfo) {
        super(postgresTaskContext, sourceInfo);
        this.typeResolverConnection = null;
        this.executorService = Executors.newSingleThreadExecutor(runnable -> {
            return new Thread(runnable, "records-stream-producer-thread");
        });
        this.replicationStream = new AtomicReference<>();
        try {
            this.replicationConnection = postgresTaskContext.createReplicationConnection();
        } catch (SQLException e) {
            throw new ConnectException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.postgresql.RecordsProducer
    public void start(Consumer<SourceRecord> consumer) {
        ConnectException connectException;
        LoggingContext.PreviousContext configureLoggingContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
        try {
            try {
                if (this.sourceInfo.hasLastKnownPosition()) {
                    Long lsn = this.sourceInfo.lsn();
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("retrieved latest position from stored offset '{}'", ReplicationConnection.format(lsn.longValue()));
                    }
                    this.replicationStream.compareAndSet(null, this.replicationConnection.startStreaming(lsn));
                } else {
                    this.logger.info("no previous LSN found in Kafka, streaming from the latest xlogpos or flushed LSN...");
                    this.replicationStream.compareAndSet(null, this.replicationConnection.startStreaming());
                }
                this.taskContext.refreshSchema(true);
                this.executorService.submit(() -> {
                    streamChanges(consumer);
                });
                configureLoggingContext.restore();
            } finally {
            }
        } catch (Throwable th) {
            configureLoggingContext.restore();
            throw th;
        }
    }

    private void streamChanges(Consumer<SourceRecord> consumer) {
        ReplicationStream replicationStream = this.replicationStream.get();
        while (!Thread.currentThread().isInterrupted()) {
            try {
                process(replicationStream.read(), replicationStream.lastReceivedLSN(), consumer);
            } catch (SQLException e) {
                Throwable cause = e.getCause();
                if (cause == null || !(cause instanceof IOException)) {
                    this.logger.error("unexpected exception while streaming logical changes", e);
                } else {
                    this.logger.warn("Closing replication stream due to db connection IO exception...");
                }
                this.taskContext.failTask(e);
                throw new ConnectException(e);
            } catch (Exception e2) {
                this.logger.error("unexpected exception while streaming logical changes", e2);
                this.taskContext.failTask(e2);
                throw new ConnectException(e2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.postgresql.RecordsProducer
    public synchronized void commit() {
        LoggingContext.PreviousContext configureLoggingContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
        try {
            try {
                ReplicationStream replicationStream = this.replicationStream.get();
                if (replicationStream != null) {
                    this.logger.debug("flushing offsets to server...");
                    replicationStream.flushLSN();
                } else {
                    this.logger.debug("streaming has already stopped, ignoring commit callback...");
                }
            } catch (SQLException e) {
                throw new ConnectException(e);
            }
        } finally {
            configureLoggingContext.restore();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.postgresql.RecordsProducer
    public synchronized void stop() {
        LoggingContext.PreviousContext configureLoggingContext = this.taskContext.configureLoggingContext(CONTEXT_NAME);
        try {
            if (this.replicationStream.get() == null) {
                this.logger.debug("already stopped....");
            } else {
                closeConnections();
            }
        } finally {
            this.replicationStream.set(null);
            this.executorService.shutdownNow();
            configureLoggingContext.restore();
        }
    }

    private void closeConnections() {
        try {
            if (this.replicationConnection != null) {
                this.logger.debug("stopping streaming...");
                this.replicationConnection.close();
            }
            try {
                if (this.typeResolverConnection != null) {
                    this.typeResolverConnection.close();
                }
                if (0 != 0) {
                    throw new ConnectException((Throwable) null);
                }
            } catch (Exception e) {
                ConnectException connectException = new ConnectException(e);
                if (0 != 0) {
                    connectException.addSuppressed((Throwable) null);
                }
                throw connectException;
            }
        } catch (Exception e2) {
            try {
                if (this.typeResolverConnection != null) {
                    this.typeResolverConnection.close();
                }
                if (e2 != null) {
                    throw new ConnectException(e2);
                }
            } catch (Exception e3) {
                ConnectException connectException2 = new ConnectException(e3);
                if (e2 != null) {
                    connectException2.addSuppressed(e2);
                }
                throw connectException2;
            }
        } catch (Throwable th) {
            try {
                if (this.typeResolverConnection != null) {
                    this.typeResolverConnection.close();
                }
                if (0 == 0) {
                    throw th;
                }
                throw new ConnectException((Throwable) null);
            } catch (Exception e4) {
                ConnectException connectException3 = new ConnectException(e4);
                if (0 != 0) {
                    connectException3.addSuppressed((Throwable) null);
                }
                throw connectException3;
            }
        }
    }

    private void process(PgProto.RowMessage rowMessage, Long l, Consumer<SourceRecord> consumer) throws SQLException {
        if (rowMessage == null) {
            return;
        }
        TableId parse = PostgresSchema.parse(rowMessage.getTable());
        if (!$assertionsDisabled && parse == null) {
            throw new AssertionError();
        }
        this.sourceInfo.update(l, Long.valueOf(rowMessage.getCommitTime()), Integer.valueOf(rowMessage.getTransactionId()));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("received new message at position {}\n{}", ReplicationConnection.format(l.longValue()), rowMessage);
        }
        TableSchema tableSchemaFor = tableSchemaFor(parse);
        if (tableSchemaFor == null) {
            return;
        }
        if (tableSchemaFor.keySchema() == null) {
            this.logger.warn("ignoring message for table '{}' because it does not have a primary key defined", parse);
        }
        PgProto.Op op = rowMessage.getOp();
        switch (op) {
            case INSERT:
                generateCreateRecord(parse, columnValues(rowMessage.getNewTupleList(), parse, true), consumer);
                return;
            case UPDATE:
                generateUpdateRecord(parse, columnValues(rowMessage.getOldTupleList(), parse, true), columnValues(rowMessage.getNewTupleList(), parse, true), consumer);
                return;
            case DELETE:
                generateDeleteRecord(parse, columnValues(rowMessage.getOldTupleList(), parse, false), consumer);
                return;
            default:
                this.logger.warn("unknown message operation: " + op);
                return;
        }
    }

    protected void generateCreateRecord(TableId tableId, Object[] objArr, Consumer<SourceRecord> consumer) {
        if (objArr == null || objArr.length == 0) {
            this.logger.warn("no new values found for table '{}' from update message at '{}';skipping record", tableId, this.sourceInfo);
            return;
        }
        TableSchema schemaFor = schema().schemaFor(tableId);
        if (!$assertionsDisabled && schemaFor == null) {
            throw new AssertionError();
        }
        Object keyFromColumnData = schemaFor.keyFromColumnData(objArr);
        Struct valueFromColumnData = schemaFor.valueFromColumnData(objArr);
        if (keyFromColumnData == null || valueFromColumnData == null) {
            return;
        }
        Schema keySchema = schemaFor.keySchema();
        Map<String, String> partition = this.sourceInfo.partition();
        Map<String, ?> offset = this.sourceInfo.offset();
        String str = topicSelector().topicNameFor(tableId);
        Envelope createEnvelope = createEnvelope(schemaFor, str);
        SourceRecord sourceRecord = new SourceRecord(partition, offset, str, (Integer) null, keySchema, keyFromColumnData, createEnvelope.schema(), createEnvelope.create(valueFromColumnData, this.sourceInfo.source(), Long.valueOf(clock().currentTimeInMillis())));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("sending create event '{}' to topic '{}'", sourceRecord, str);
        }
        consumer.accept(sourceRecord);
    }

    protected void generateUpdateRecord(TableId tableId, Object[] objArr, Object[] objArr2, Consumer<SourceRecord> consumer) {
        if (objArr2 == null || objArr2.length == 0) {
            this.logger.warn("no values found for table '{}' from update message at '{}';skipping record", tableId, this.sourceInfo);
            return;
        }
        Schema schema = null;
        Struct struct = null;
        Object obj = null;
        TableSchema schemaFor = schema().schemaFor(tableId);
        if (!$assertionsDisabled && schemaFor == null) {
            throw new AssertionError();
        }
        if (objArr != null && objArr.length > 0) {
            obj = schemaFor.keyFromColumnData(objArr);
            schema = schemaFor.keySchema();
            struct = schemaFor.valueFromColumnData(objArr);
        }
        Object keyFromColumnData = schemaFor.keyFromColumnData(objArr2);
        Struct valueFromColumnData = schemaFor.valueFromColumnData(objArr2);
        Schema keySchema = schemaFor.keySchema();
        Map<String, String> partition = this.sourceInfo.partition();
        Map<String, ?> offset = this.sourceInfo.offset();
        String str = topicSelector().topicNameFor(tableId);
        Envelope createEnvelope = createEnvelope(schemaFor, str);
        Struct source = this.sourceInfo.source();
        if (obj == null || Objects.equals(obj, keyFromColumnData)) {
            consumer.accept(new SourceRecord(partition, offset, str, (Integer) null, keySchema, keyFromColumnData, createEnvelope.schema(), createEnvelope.update(struct, valueFromColumnData, source, Long.valueOf(clock().currentTimeInMillis()))));
            return;
        }
        SourceRecord sourceRecord = new SourceRecord(partition, offset, str, (Integer) null, schema, obj, createEnvelope.schema(), createEnvelope.delete(struct, source, Long.valueOf(clock().currentTimeInMillis())));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("sending delete event '{}' to topic '{}'", sourceRecord, str);
        }
        consumer.accept(sourceRecord);
        SourceRecord sourceRecord2 = new SourceRecord(partition, offset, str, (Integer) null, schema, obj, (Schema) null, (Object) null);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("sending tombstone event '{}' to topic '{}'", sourceRecord2, str);
        }
        consumer.accept(sourceRecord2);
        SourceRecord sourceRecord3 = new SourceRecord(partition, offset, str, (Integer) null, keySchema, keyFromColumnData, createEnvelope.schema(), createEnvelope.create(valueFromColumnData, source, Long.valueOf(clock().currentTimeInMillis())));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("sending create event '{}' to topic '{}'", sourceRecord3, str);
        }
        consumer.accept(sourceRecord3);
    }

    protected void generateDeleteRecord(TableId tableId, Object[] objArr, Consumer<SourceRecord> consumer) {
        if (objArr == null || objArr.length == 0) {
            this.logger.warn("no values found for table '{}' from update message at '{}';skipping record", tableId, this.sourceInfo);
            return;
        }
        TableSchema schemaFor = schema().schemaFor(tableId);
        if (!$assertionsDisabled && schemaFor == null) {
            throw new AssertionError();
        }
        Object keyFromColumnData = schemaFor.keyFromColumnData(objArr);
        Struct valueFromColumnData = schemaFor.valueFromColumnData(objArr);
        if (keyFromColumnData == null || valueFromColumnData == null) {
            return;
        }
        Schema keySchema = schemaFor.keySchema();
        Map<String, String> partition = this.sourceInfo.partition();
        Map<String, ?> offset = this.sourceInfo.offset();
        String str = topicSelector().topicNameFor(tableId);
        Envelope createEnvelope = createEnvelope(schemaFor, str);
        SourceRecord sourceRecord = new SourceRecord(partition, offset, str, (Integer) null, keySchema, keyFromColumnData, createEnvelope.schema(), createEnvelope.delete(valueFromColumnData, this.sourceInfo.source(), Long.valueOf(clock().currentTimeInMillis())));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("sending delete event '{}' to topic '{}'", sourceRecord, str);
        }
        consumer.accept(sourceRecord);
        SourceRecord sourceRecord2 = new SourceRecord(partition, offset, str, (Integer) null, keySchema, keyFromColumnData, (Schema) null, (Object) null);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("sending tombstone event '{}' to topic '{}'", sourceRecord2, str);
        }
        consumer.accept(sourceRecord2);
    }

    private Object[] columnValues(List<PgProto.DatumMessage> list, TableId tableId, boolean z) throws SQLException {
        if (list == null || list.isEmpty()) {
            return null;
        }
        Table tableFor = schema().tableFor(tableId);
        if (!$assertionsDisabled && tableFor == null) {
            throw new AssertionError();
        }
        if (z && schemaChanged(list, tableFor)) {
            PostgresConnection createConnection = this.taskContext.createConnection();
            Throwable th = null;
            try {
                try {
                    schema().refresh(createConnection, tableId);
                    if (createConnection != null) {
                        if (0 != 0) {
                            try {
                                createConnection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createConnection.close();
                        }
                    }
                    tableFor = schema().tableFor(tableId);
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (createConnection != null) {
                    if (th != null) {
                        try {
                            createConnection.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        createConnection.close();
                    }
                }
                throw th4;
            }
        }
        List columnNames = tableFor.columnNames();
        Object[] objArr = new Object[list.size()];
        list.forEach(datumMessage -> {
            int indexOf = columnNames.indexOf(Strings.unquoteIdentifierPart(datumMessage.getColumnName()));
            if (!$assertionsDisabled && indexOf < 0) {
                throw new AssertionError();
            }
            objArr[indexOf] = extractValueFromMessage(datumMessage);
        });
        return objArr;
    }

    private boolean schemaChanged(List<PgProto.DatumMessage> list, Table table) {
        List columnNames = table.columnNames();
        if (columnNames.size() != list.size()) {
            return true;
        }
        return list.stream().filter(datumMessage -> {
            String columnName = datumMessage.getColumnName();
            Column columnWithName = table.columnWithName(columnName);
            if (columnWithName == null) {
                this.logger.debug("found new column '{}' present in the server message which is not part of the table metadata; refreshing table schema", columnName);
                return true;
            }
            if (schema().isType(columnWithName.typeName(), columnWithName.jdbcType())) {
                return false;
            }
            this.logger.debug("detected new type for column '{}', old type was '{}', new type is '{}'; refreshing table schema", new Object[]{columnName, Integer.valueOf(columnWithName.jdbcType()), Long.valueOf(datumMessage.getColumnType())});
            return true;
        }).findFirst().isPresent();
    }

    private TableSchema tableSchemaFor(TableId tableId) throws SQLException {
        PostgresSchema schema = schema();
        if (schema.isFilteredOut(tableId)) {
            this.logger.debug("table '{}' is filtered out, ignoring", tableId);
            return null;
        }
        TableSchema schemaFor = schema.schemaFor(tableId);
        if (schemaFor != null) {
            return schemaFor;
        }
        PostgresConnection createConnection = this.taskContext.createConnection();
        Throwable th = null;
        try {
            try {
                schema.refresh(createConnection, tableId);
                if (createConnection != null) {
                    if (0 != 0) {
                        try {
                            createConnection.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createConnection.close();
                    }
                }
                TableSchema schemaFor2 = schema.schemaFor(tableId);
                if (schemaFor2 == null) {
                    this.logger.warn("cannot load schema for table '{}'", tableId);
                    return null;
                }
                this.logger.debug("refreshed DB schema to include table '{}'", tableId);
                return schemaFor2;
            } finally {
            }
        } catch (Throwable th3) {
            if (createConnection != null) {
                if (th != null) {
                    try {
                        createConnection.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createConnection.close();
                }
            }
            throw th3;
        }
    }

    protected Object extractValueFromMessage(PgProto.DatumMessage datumMessage) {
        int columnType = (int) datumMessage.getColumnType();
        switch (columnType) {
            case 16:
                if (datumMessage.hasDatumBool()) {
                    return Boolean.valueOf(datumMessage.getDatumBool());
                }
                return null;
            case 17:
                if (datumMessage.hasDatumBytes()) {
                    return datumMessage.getDatumBytes().toByteArray();
                }
                return null;
            case 18:
            case 25:
            case 114:
            case 142:
            case 1042:
            case 1043:
            case 1560:
            case 1562:
            case 2950:
            case 3802:
                if (datumMessage.hasDatumString()) {
                    return datumMessage.getDatumString();
                }
                return null;
            case 20:
            case 26:
            case 790:
                if (datumMessage.hasDatumInt64()) {
                    return Long.valueOf(datumMessage.getDatumInt64());
                }
                return null;
            case 21:
            case 23:
                if (datumMessage.hasDatumInt32()) {
                    return Integer.valueOf(datumMessage.getDatumInt32());
                }
                return null;
            case 143:
            case 199:
            case 791:
            case 1000:
            case 1001:
            case 1002:
            case 1003:
            case 1005:
            case 1007:
            case 1009:
            case 1014:
            case 1015:
            case 1016:
            case 1017:
            case 1021:
            case 1022:
            case 1028:
            case 1115:
            case 1182:
            case 1183:
            case 1185:
            case 1187:
            case 1231:
            case 1270:
            case 1563:
            case 2201:
            case 2951:
            case 3807:
                try {
                    byte[] byteArray = datumMessage.hasDatumBytes() ? datumMessage.getDatumBytes().toByteArray() : null;
                    if (byteArray == null) {
                        return null;
                    }
                    return Arrays.asList((Object[]) new PgArray(typeResolverConnection(), columnType, new String(byteArray, Charset.forName("UTF-8"))).getArray());
                } catch (SQLException e) {
                    this.logger.warn("Unexpected exception trying to process PgArray column '{}'", datumMessage.getColumnName(), e);
                    return null;
                }
            case 600:
                PgProto.Point datumPoint = datumMessage.getDatumPoint();
                return new PGpoint(datumPoint.getX(), datumPoint.getY());
            case 700:
                if (datumMessage.hasDatumFloat()) {
                    return Float.valueOf(datumMessage.getDatumFloat());
                }
                return null;
            case 701:
            case 1700:
                if (datumMessage.hasDatumDouble()) {
                    return Double.valueOf(datumMessage.getDatumDouble());
                }
                return null;
            case 1082:
                if (datumMessage.hasDatumInt32()) {
                    return Long.valueOf(datumMessage.getDatumInt32());
                }
                return null;
            case 1083:
            case 1114:
            case 1184:
                if (datumMessage.hasDatumInt64()) {
                    return Long.valueOf(TimeUnit.NANOSECONDS.convert(datumMessage.getDatumInt64(), TimeUnit.MICROSECONDS));
                }
                return null;
            case 1186:
                if (datumMessage.hasDatumDouble()) {
                    return Double.valueOf(datumMessage.getDatumDouble());
                }
                return null;
            case 1266:
                if (datumMessage.hasDatumDouble()) {
                    return Long.valueOf(BigDecimal.valueOf(datumMessage.getDatumDouble() * 1000.0d).longValue());
                }
                return null;
            case 3910:
                if (datumMessage.hasDatumBytes()) {
                    return new String(datumMessage.getDatumBytes().toByteArray(), Charset.forName("UTF-8"));
                }
                return null;
            default:
                this.logger.warn("processing column '{}' with unknown data type '{}' as byte array", datumMessage.getColumnName(), Long.valueOf(datumMessage.getColumnType()));
                if (datumMessage.hasDatumBytes()) {
                    return datumMessage.getDatumBytes().toByteArray();
                }
                return null;
        }
    }

    private synchronized PgConnection typeResolverConnection() throws SQLException {
        if (this.typeResolverConnection == null) {
            this.typeResolverConnection = this.taskContext.createConnection().connection();
        }
        return this.typeResolverConnection;
    }

    static {
        $assertionsDisabled = !RecordsStreamProducer.class.desiredAssertionStatus();
    }
}
