package io.debezium.connector.postgresql;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.postgresql.connection.LogicalDecodingMessage;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.ChangeEventCreator;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/postgresql/PostgresEventDispatcher.class */
public class PostgresEventDispatcher<T extends DataCollectionId> extends EventDispatcher<PostgresPartition, T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PostgresEventDispatcher.class);
    private final ChangeEventQueue<DataChangeEvent> queue;
    private final LogicalDecodingMessageMonitor logicalDecodingMessageMonitor;
    private final LogicalDecodingMessageFilter messageFilter;

    public PostgresEventDispatcher(PostgresConnectorConfig postgresConnectorConfig, TopicSelector<T> topicSelector, DatabaseSchema<T> databaseSchema, ChangeEventQueue<DataChangeEvent> changeEventQueue, DataCollectionFilters.DataCollectionFilter<T> dataCollectionFilter, ChangeEventCreator changeEventCreator, EventMetadataProvider eventMetadataProvider, SchemaNameAdjuster schemaNameAdjuster) {
        this(postgresConnectorConfig, topicSelector, databaseSchema, changeEventQueue, dataCollectionFilter, changeEventCreator, null, eventMetadataProvider, null, schemaNameAdjuster, null);
    }

    public PostgresEventDispatcher(PostgresConnectorConfig postgresConnectorConfig, TopicSelector<T> topicSelector, DatabaseSchema<T> databaseSchema, ChangeEventQueue<DataChangeEvent> changeEventQueue, DataCollectionFilters.DataCollectionFilter<T> dataCollectionFilter, ChangeEventCreator changeEventCreator, EventMetadataProvider eventMetadataProvider, Heartbeat heartbeat, SchemaNameAdjuster schemaNameAdjuster) {
        this(postgresConnectorConfig, topicSelector, databaseSchema, changeEventQueue, dataCollectionFilter, changeEventCreator, null, eventMetadataProvider, heartbeat, schemaNameAdjuster, null);
    }

    public PostgresEventDispatcher(PostgresConnectorConfig postgresConnectorConfig, TopicSelector<T> topicSelector, DatabaseSchema<T> databaseSchema, ChangeEventQueue<DataChangeEvent> changeEventQueue, DataCollectionFilters.DataCollectionFilter<T> dataCollectionFilter, ChangeEventCreator changeEventCreator, EventDispatcher.InconsistentSchemaHandler<PostgresPartition, T> inconsistentSchemaHandler, EventMetadataProvider eventMetadataProvider, Heartbeat heartbeat, SchemaNameAdjuster schemaNameAdjuster, JdbcConnection jdbcConnection) {
        super(postgresConnectorConfig, topicSelector, databaseSchema, changeEventQueue, dataCollectionFilter, changeEventCreator, inconsistentSchemaHandler, eventMetadataProvider, heartbeat, schemaNameAdjuster);
        this.queue = changeEventQueue;
        this.logicalDecodingMessageMonitor = new LogicalDecodingMessageMonitor(postgresConnectorConfig, this::enqueueLogicalDecodingMessage);
        this.messageFilter = postgresConnectorConfig.getMessageFilter();
    }

    public void dispatchLogicalDecodingMessage(Partition partition, OffsetContext offsetContext, Long l, LogicalDecodingMessage logicalDecodingMessage) throws InterruptedException {
        if (this.messageFilter.isIncluded(logicalDecodingMessage.getPrefix())) {
            this.logicalDecodingMessageMonitor.logicalDecodingMessageEvent(partition, offsetContext, l, logicalDecodingMessage);
        } else {
            LOGGER.trace("Filtered data change event for logical decoding message with prefix{}", logicalDecodingMessage.getPrefix());
        }
    }

    private void enqueueLogicalDecodingMessage(SourceRecord sourceRecord) throws InterruptedException {
        this.queue.enqueue(new DataChangeEvent(sourceRecord));
    }
}
