package io.debezium.connector.mysql;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.binlog.jdbc.BinlogConnectorConnection;
import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import java.util.Optional;
import java.util.function.Function;
import org.apache.kafka.connect.source.SourceRecord;

/* loaded from: input_file:io/debezium/connector/mysql/MySqlChangeEventSourceFactory.class */
public class MySqlChangeEventSourceFactory implements ChangeEventSourceFactory<MySqlPartition, MySqlOffsetContext> {
    private final MySqlConnectorConfig configuration;
    private final MainConnectionProvidingConnectionFactory<BinlogConnectorConnection> connectionFactory;
    private final ErrorHandler errorHandler;
    private final EventDispatcher<MySqlPartition, TableId> dispatcher;
    private final Clock clock;
    private final MySqlTaskContext taskContext;
    private final MySqlStreamingChangeEventSourceMetrics streamingMetrics;
    private final MySqlDatabaseSchema schema;
    private final ChangeEventQueue<DataChangeEvent> queue;
    private final SnapshotterService snapshotterService;

    public MySqlChangeEventSourceFactory(MySqlConnectorConfig mySqlConnectorConfig, MainConnectionProvidingConnectionFactory<BinlogConnectorConnection> mainConnectionProvidingConnectionFactory, ErrorHandler errorHandler, EventDispatcher<MySqlPartition, TableId> eventDispatcher, Clock clock, MySqlDatabaseSchema mySqlDatabaseSchema, MySqlTaskContext mySqlTaskContext, MySqlStreamingChangeEventSourceMetrics mySqlStreamingChangeEventSourceMetrics, ChangeEventQueue<DataChangeEvent> changeEventQueue, SnapshotterService snapshotterService) {
        this.configuration = mySqlConnectorConfig;
        this.connectionFactory = mainConnectionProvidingConnectionFactory;
        this.errorHandler = errorHandler;
        this.dispatcher = eventDispatcher;
        this.clock = clock;
        this.taskContext = mySqlTaskContext;
        this.streamingMetrics = mySqlStreamingChangeEventSourceMetrics;
        this.queue = changeEventQueue;
        this.schema = mySqlDatabaseSchema;
        this.snapshotterService = snapshotterService;
    }

    public SnapshotChangeEventSource<MySqlPartition, MySqlOffsetContext> getSnapshotChangeEventSource(SnapshotProgressListener<MySqlPartition> snapshotProgressListener, NotificationService<MySqlPartition, MySqlOffsetContext> notificationService) {
        return new MySqlSnapshotChangeEventSource(this.configuration, this.connectionFactory, this.taskContext.getSchema(), this.dispatcher, this.clock, (MySqlSnapshotChangeEventSourceMetrics) snapshotProgressListener, this::modifyAndFlushLastRecord, this::preSnapshot, notificationService, this.snapshotterService);
    }

    private void preSnapshot() {
        this.queue.enableBuffering();
    }

    private void modifyAndFlushLastRecord(Function<SourceRecord, SourceRecord> function) throws InterruptedException {
        this.queue.flushBuffer(dataChangeEvent -> {
            return new DataChangeEvent((SourceRecord) function.apply(dataChangeEvent.getRecord()));
        });
        this.queue.disableBuffering();
    }

    public StreamingChangeEventSource<MySqlPartition, MySqlOffsetContext> getStreamingChangeEventSource() {
        this.queue.disableBuffering();
        return new MySqlStreamingChangeEventSource(this.configuration, this.connectionFactory.mainConnection(), this.dispatcher, this.errorHandler, this.clock, this.taskContext, this.streamingMetrics, this.snapshotterService);
    }

    public Optional<IncrementalSnapshotChangeEventSource<MySqlPartition, ? extends DataCollectionId>> getIncrementalSnapshotChangeEventSource(MySqlOffsetContext mySqlOffsetContext, SnapshotProgressListener<MySqlPartition> snapshotProgressListener, DataChangeEventListener<MySqlPartition> dataChangeEventListener, NotificationService<MySqlPartition, MySqlOffsetContext> notificationService) {
        if (!this.configuration.isReadOnlyConnection()) {
            return Strings.isNullOrEmpty(this.configuration.getSignalingDataCollectionId()) ? Optional.empty() : Optional.of(new SignalBasedIncrementalSnapshotChangeEventSource(this.configuration, this.connectionFactory.mainConnection(), this.dispatcher, this.schema, this.clock, snapshotProgressListener, dataChangeEventListener, notificationService));
        }
        if (this.connectionFactory.mainConnection().isGtidModeEnabled()) {
            return Optional.of(new MySqlReadOnlyIncrementalSnapshotChangeEventSource(this.configuration, this.connectionFactory.mainConnection(), this.dispatcher, this.schema, this.clock, snapshotProgressListener, dataChangeEventListener, notificationService));
        }
        throw new UnsupportedOperationException("Read only connection requires GTID_MODE to be ON");
    }

    public /* bridge */ /* synthetic */ Optional getIncrementalSnapshotChangeEventSource(OffsetContext offsetContext, SnapshotProgressListener snapshotProgressListener, DataChangeEventListener dataChangeEventListener, NotificationService notificationService) {
        return getIncrementalSnapshotChangeEventSource((MySqlOffsetContext) offsetContext, (SnapshotProgressListener<MySqlPartition>) snapshotProgressListener, (DataChangeEventListener<MySqlPartition>) dataChangeEventListener, (NotificationService<MySqlPartition, MySqlOffsetContext>) notificationService);
    }
}
