package io.debezium.connector.oracle.logminer.processor;

import io.debezium.config.Configuration;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.oracle.CommitScn;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleDefaultValueConverter;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleTaskContext;
import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.events.EventType;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.schema.SchemaTopicNamingStrategy;
import java.math.BigInteger;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import oracle.sql.CharacterSet;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:io/debezium/connector/oracle/logminer/processor/AbstractProcessorUnitTest.class */
public abstract class AbstractProcessorUnitTest<T extends AbstractLogMinerEventProcessor> extends AbstractConnectorTest {
    private static final String TRANSACTION_ID_1 = "1234567890";
    private static final String TRANSACTION_ID_2 = "9876543210";
    private static final String TRANSACTION_ID_3 = "9880212345";

    @Rule
    public TestRule skipRule = new SkipTestDependingOnAdapterNameRule();
    protected ChangeEventSource.ChangeEventSourceContext context;
    protected EventDispatcher<OraclePartition, TableId> dispatcher;
    protected OracleDatabaseSchema schema;
    protected LogMinerStreamingChangeEventSourceMetrics metrics;
    protected OraclePartition partition;
    protected OracleOffsetContext offsetContext;
    protected OracleConnection connection;

    @Before
    public void before() throws Exception {
        this.context = (ChangeEventSource.ChangeEventSourceContext) Mockito.mock(ChangeEventSource.ChangeEventSourceContext.class);
        Mockito.when(Boolean.valueOf(this.context.isRunning())).thenReturn(true);
        this.dispatcher = (EventDispatcher) Mockito.mock(EventDispatcher.class);
        this.partition = (OraclePartition) Mockito.mock(OraclePartition.class);
        this.offsetContext = (OracleOffsetContext) Mockito.mock(OracleOffsetContext.class);
        Mockito.when(this.offsetContext.getCommitScn()).thenReturn(CommitScn.valueOf((String) null));
        Mockito.when(this.offsetContext.getSnapshotScn()).thenReturn(Scn.valueOf("1"));
        this.connection = createOracleConnection(false);
        this.schema = createOracleDatabaseSchema();
        this.metrics = createMetrics(this.schema);
    }

    @After
    public void after() {
        if (this.schema != null) {
            try {
                this.schema.close();
            } finally {
                this.schema = null;
            }
        }
    }

    protected abstract Configuration.Builder getConfig();

    protected abstract T getProcessor(OracleConnectorConfig oracleConnectorConfig);

    protected boolean isTransactionAbandonmentSupported() {
        return true;
    }

    @Test
    public void testCacheIsEmpty() throws Exception {
        T processor = getProcessor(new OracleConnectorConfig(getConfig().build()));
        try {
            Assertions.assertThat(processor.getTransactionCache().isEmpty()).isTrue();
            if (processor != null) {
                processor.close();
            }
        } catch (Throwable th) {
            if (processor != null) {
                try {
                    processor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCacheIsNotEmptyWhenTransactionIsAdded() throws Exception {
        T processor = getProcessor(new OracleConnectorConfig(getConfig().build()));
        try {
            processor.handleStart(getStartLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1));
            processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_1));
            Assertions.assertThat(processor.getTransactionCache().isEmpty()).isFalse();
            if (processor != null) {
                processor.close();
            }
        } catch (Throwable th) {
            if (processor != null) {
                try {
                    processor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-7473"})
    public void testCacheIsNotEmptyWhenTransactionIsAddedAndStartEventIsNotHandled() throws Exception {
        T processor = getProcessor(new OracleConnectorConfig(getConfig().build()));
        try {
            processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1));
            Assertions.assertThat(processor.getTransactionCache().isEmpty()).isFalse();
            if (processor != null) {
                processor.close();
            }
        } catch (Throwable th) {
            if (processor != null) {
                try {
                    processor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCacheIsEmptyWhenTransactionIsCommitted() throws Exception {
        OracleConnectorConfig oracleConnectorConfig = new OracleConnectorConfig(getConfig().build());
        OraclePartition oraclePartition = new OraclePartition(oracleConnectorConfig.getLogicalName(), oracleConnectorConfig.getDatabaseName());
        T processor = getProcessor(oracleConnectorConfig);
        try {
            LogMinerEventRow insertLogMinerEventRow = getInsertLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_1);
            processor.handleStart(getStartLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1));
            processor.handleDataEvent(insertLogMinerEventRow);
            processor.handleCommit(oraclePartition, getCommitLogMinerEventRow(Scn.valueOf(3L), TRANSACTION_ID_1));
            Assertions.assertThat(processor.getTransactionCache().isEmpty()).isTrue();
            if (processor != null) {
                processor.close();
            }
        } catch (Throwable th) {
            if (processor != null) {
                try {
                    processor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-7473"})
    public void testCacheIsEmptyWhenTransactionIsCommittedAndStartEventIsNotHandled() throws Exception {
        OracleConnectorConfig oracleConnectorConfig = new OracleConnectorConfig(getConfig().build());
        OraclePartition oraclePartition = new OraclePartition(oracleConnectorConfig.getLogicalName(), oracleConnectorConfig.getDatabaseName());
        T processor = getProcessor(oracleConnectorConfig);
        try {
            processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1));
            processor.handleCommit(oraclePartition, getCommitLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_1));
            Assertions.assertThat(processor.getTransactionCache().isEmpty()).isTrue();
            if (processor != null) {
                processor.close();
            }
        } catch (Throwable th) {
            if (processor != null) {
                try {
                    processor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCacheIsEmptyWhenTransactionIsRolledBack() throws Exception {
        T processor = getProcessor(new OracleConnectorConfig(getConfig().build()));
        try {
            processor.handleStart(getStartLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1));
            processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_1));
            processor.handleRollback(getRollbackLogMinerEventRow(Scn.valueOf(3L), TRANSACTION_ID_1));
            Assertions.assertThat(processor.getTransactionCache().isEmpty()).isTrue();
            if (processor != null) {
                processor.close();
            }
        } catch (Throwable th) {
            if (processor != null) {
                try {
                    processor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-7473"})
    public void testCacheIsEmptyWhenTransactionIsRolledBackAndStartEventIsNotHandled() throws Exception {
        T processor = getProcessor(new OracleConnectorConfig(getConfig().build()));
        try {
            processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1));
            processor.handleRollback(getRollbackLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_1));
            Assertions.assertThat(processor.getTransactionCache().isEmpty()).isTrue();
            if (processor != null) {
                processor.close();
            }
        } catch (Throwable th) {
            if (processor != null) {
                try {
                    processor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCacheIsNotEmptyWhenFirstTransactionIsRolledBack() throws Exception {
        T processor = getProcessor(new OracleConnectorConfig(getConfig().build()));
        try {
            processor.handleStart(getStartLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1));
            processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_1));
            processor.handleStart(getStartLogMinerEventRow(Scn.valueOf(3L), TRANSACTION_ID_2));
            processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(4L), TRANSACTION_ID_2));
            processor.handleRollback(getRollbackLogMinerEventRow(Scn.valueOf(5L), TRANSACTION_ID_1));
            Assertions.assertThat(processor.getTransactionCache().isEmpty()).isFalse();
            Assertions.assertThat(this.metrics.getRolledBackTransactionIds().contains(TRANSACTION_ID_1)).isTrue();
            Assertions.assertThat(this.metrics.getRolledBackTransactionIds().contains(TRANSACTION_ID_2)).isFalse();
            if (processor != null) {
                processor.close();
            }
        } catch (Throwable th) {
            if (processor != null) {
                try {
                    processor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-7473"})
    public void testCacheIsNotEmptyWhenFirstTransactionIsRolledBackAndStartEventIsNotHandled() throws Exception {
        T processor = getProcessor(new OracleConnectorConfig(getConfig().build()));
        try {
            processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1));
            processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_2));
            processor.handleRollback(getRollbackLogMinerEventRow(Scn.valueOf(3L), TRANSACTION_ID_1));
            Assertions.assertThat(processor.getTransactionCache().isEmpty()).isFalse();
            Assertions.assertThat(this.metrics.getRolledBackTransactionIds().contains(TRANSACTION_ID_1)).isTrue();
            Assertions.assertThat(this.metrics.getRolledBackTransactionIds().contains(TRANSACTION_ID_2)).isFalse();
            if (processor != null) {
                processor.close();
            }
        } catch (Throwable th) {
            if (processor != null) {
                try {
                    processor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCacheIsNotEmptyWhenSecondTransactionIsRolledBack() throws Exception {
        T processor = getProcessor(new OracleConnectorConfig(getConfig().build()));
        try {
            processor.handleStart(getStartLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1));
            processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_1));
            processor.handleStart(getStartLogMinerEventRow(Scn.valueOf(3L), TRANSACTION_ID_2));
            processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(4L), TRANSACTION_ID_2));
            processor.handleRollback(getRollbackLogMinerEventRow(Scn.valueOf(5L), TRANSACTION_ID_2));
            Assertions.assertThat(processor.getTransactionCache().isEmpty()).isFalse();
            Assertions.assertThat(this.metrics.getRolledBackTransactionIds().contains(TRANSACTION_ID_2)).isTrue();
            Assertions.assertThat(this.metrics.getRolledBackTransactionIds().contains(TRANSACTION_ID_1)).isFalse();
            if (processor != null) {
                processor.close();
            }
        } catch (Throwable th) {
            if (processor != null) {
                try {
                    processor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-7473"})
    public void testCacheIsNotEmptyWhenSecondTransactionIsRolledBackAndStartEventIsNotHandled() throws Exception {
        T processor = getProcessor(new OracleConnectorConfig(getConfig().build()));
        try {
            processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1));
            processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_2));
            processor.handleRollback(getRollbackLogMinerEventRow(Scn.valueOf(3L), TRANSACTION_ID_2));
            Assertions.assertThat(processor.getTransactionCache().isEmpty()).isFalse();
            Assertions.assertThat(this.metrics.getRolledBackTransactionIds().contains(TRANSACTION_ID_2)).isTrue();
            Assertions.assertThat(this.metrics.getRolledBackTransactionIds().contains(TRANSACTION_ID_1)).isFalse();
            if (processor != null) {
                processor.close();
            }
        } catch (Throwable th) {
            if (processor != null) {
                try {
                    processor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCalculateScnWhenTransactionIsCommitted() throws Exception {
        OracleConnectorConfig oracleConnectorConfig = new OracleConnectorConfig(getConfig().build());
        OraclePartition oraclePartition = new OraclePartition(oracleConnectorConfig.getLogicalName(), oracleConnectorConfig.getDatabaseName());
        T processor = getProcessor(oracleConnectorConfig);
        try {
            processor.handleStart(getStartLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1));
            processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_1));
            processor.handleCommit(oraclePartition, getCommitLogMinerEventRow(Scn.valueOf(3L), TRANSACTION_ID_1));
            Assertions.assertThat(this.metrics.getOldestScn()).isEqualTo(Scn.valueOf(2L).toString());
            Assertions.assertThat(this.metrics.getRolledBackTransactionIds()).isEmpty();
            if (processor != null) {
                processor.close();
            }
        } catch (Throwable th) {
            if (processor != null) {
                try {
                    processor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-7473"})
    public void testCalculateScnWhenTransactionIsCommittedAndStartEventIsNotHandled() throws Exception {
        OracleConnectorConfig oracleConnectorConfig = new OracleConnectorConfig(getConfig().build());
        OraclePartition oraclePartition = new OraclePartition(oracleConnectorConfig.getLogicalName(), oracleConnectorConfig.getDatabaseName());
        T processor = getProcessor(oracleConnectorConfig);
        try {
            processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1));
            processor.handleCommit(oraclePartition, getCommitLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_1));
            Assertions.assertThat(this.metrics.getOldestScn()).isEqualTo(Scn.valueOf(1L).toString());
            Assertions.assertThat(this.metrics.getRolledBackTransactionIds()).isEmpty();
            if (processor != null) {
                processor.close();
            }
        } catch (Throwable th) {
            if (processor != null) {
                try {
                    processor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCalculateScnWhenFirstTransactionIsCommitted() throws Exception {
        OracleConnectorConfig oracleConnectorConfig = new OracleConnectorConfig(getConfig().build());
        OraclePartition oraclePartition = new OraclePartition(oracleConnectorConfig.getLogicalName(), oracleConnectorConfig.getDatabaseName());
        T processor = getProcessor(oracleConnectorConfig);
        try {
            processor.handleStart(getStartLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1));
            processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_1));
            processor.handleStart(getStartLogMinerEventRow(Scn.valueOf(3L), TRANSACTION_ID_2));
            processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(4L), TRANSACTION_ID_2));
            processor.handleCommit(oraclePartition, getCommitLogMinerEventRow(Scn.valueOf(5L), TRANSACTION_ID_1));
            Assertions.assertThat(this.metrics.getOldestScn()).isEqualTo(Scn.valueOf(3L).toString());
            Assertions.assertThat(this.metrics.getRolledBackTransactionIds()).isEmpty();
            processor.handleCommit(oraclePartition, getCommitLogMinerEventRow(Scn.valueOf(6L), TRANSACTION_ID_2));
            Assertions.assertThat(this.metrics.getOldestScn()).isEqualTo(Scn.valueOf(4L).toString());
            if (processor != null) {
                processor.close();
            }
        } catch (Throwable th) {
            if (processor != null) {
                try {
                    processor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-7473"})
    public void testCalculateScnWhenFirstTransactionIsCommittedAndStartEventIsNotHandled() throws Exception {
        OracleConnectorConfig oracleConnectorConfig = new OracleConnectorConfig(getConfig().build());
        OraclePartition oraclePartition = new OraclePartition(oracleConnectorConfig.getLogicalName(), oracleConnectorConfig.getDatabaseName());
        T processor = getProcessor(oracleConnectorConfig);
        try {
            processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1));
            processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_2));
            processor.handleCommit(oraclePartition, getCommitLogMinerEventRow(Scn.valueOf(3L), TRANSACTION_ID_1));
            Assertions.assertThat(this.metrics.getOldestScn()).isEqualTo(Scn.valueOf(2L).toString());
            Assertions.assertThat(this.metrics.getRolledBackTransactionIds()).isEmpty();
            processor.handleCommit(oraclePartition, getCommitLogMinerEventRow(Scn.valueOf(4L), TRANSACTION_ID_2));
            Assertions.assertThat(this.metrics.getOldestScn()).isEqualTo(Scn.valueOf(2L).toString());
            if (processor != null) {
                processor.close();
            }
        } catch (Throwable th) {
            if (processor != null) {
                try {
                    processor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCalculateScnWhenSecondTransactionIsCommitted() throws Exception {
        OracleConnectorConfig oracleConnectorConfig = new OracleConnectorConfig(getConfig().build());
        OraclePartition oraclePartition = new OraclePartition(oracleConnectorConfig.getLogicalName(), oracleConnectorConfig.getDatabaseName());
        T processor = getProcessor(oracleConnectorConfig);
        try {
            processor.handleStart(getStartLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1));
            processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_1));
            processor.handleStart(getStartLogMinerEventRow(Scn.valueOf(3L), TRANSACTION_ID_2));
            processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(4L), TRANSACTION_ID_2));
            processor.handleCommit(oraclePartition, getCommitLogMinerEventRow(Scn.valueOf(5L), TRANSACTION_ID_2));
            Assertions.assertThat(this.metrics.getOldestScn()).isEqualTo(Scn.valueOf(1L).toString());
            Assertions.assertThat(this.metrics.getRolledBackTransactionIds()).isEmpty();
            if (processor != null) {
                processor.close();
            }
        } catch (Throwable th) {
            if (processor != null) {
                try {
                    processor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-7473"})
    public void testCalculateScnWhenSecondTransactionIsCommittedAndStartEventIsNotHandled() throws Exception {
        OracleConnectorConfig oracleConnectorConfig = new OracleConnectorConfig(getConfig().build());
        OraclePartition oraclePartition = new OraclePartition(oracleConnectorConfig.getLogicalName(), oracleConnectorConfig.getDatabaseName());
        T processor = getProcessor(oracleConnectorConfig);
        try {
            processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1));
            processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_2));
            processor.handleCommit(oraclePartition, getCommitLogMinerEventRow(Scn.valueOf(3L), TRANSACTION_ID_2));
            Assertions.assertThat(this.metrics.getOldestScn()).isEqualTo(Scn.valueOf(1L).toString());
            Assertions.assertThat(this.metrics.getRolledBackTransactionIds()).isEmpty();
            if (processor != null) {
                processor.close();
            }
        } catch (Throwable th) {
            if (processor != null) {
                try {
                    processor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-6679"})
    public void testEmptyResultSetWithMineRangeAdvancesCorrectly() throws Exception {
        if (isTransactionAbandonmentSupported()) {
            T processor = getProcessor(new OracleConnectorConfig(getConfig().build()));
            try {
                ResultSet resultSet = (ResultSet) Mockito.mock(ResultSet.class);
                Mockito.when(Boolean.valueOf(resultSet.next())).thenReturn(false);
                PreparedStatement preparedStatement = (PreparedStatement) Mockito.mock(PreparedStatement.class);
                Mockito.when(processor.createQueryStatement()).thenReturn(preparedStatement);
                Mockito.when(preparedStatement.executeQuery()).thenReturn(resultSet);
                Assertions.assertThat(processor.process(Scn.valueOf(100), Scn.valueOf(200))).isEqualTo(Scn.valueOf(100));
                if (processor != null) {
                    processor.close();
                }
            } catch (Throwable th) {
                if (processor != null) {
                    try {
                        processor.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    @Test
    @FixFor({"DBZ-6679"})
    public void testNonEmptyResultSetWithMineRangeAdvancesCorrectly() throws Exception {
        if (isTransactionAbandonmentSupported()) {
            T processor = getProcessor(new OracleConnectorConfig(getConfig().build()));
            try {
                ResultSet resultSet = (ResultSet) Mockito.mock(ResultSet.class);
                Mockito.when(Boolean.valueOf(resultSet.next())).thenReturn(true, new Boolean[]{false});
                Mockito.when(resultSet.getString(1)).thenReturn("101");
                Mockito.when(resultSet.getString(2)).thenReturn("insert into \"DEBEZIUM\".\"ABC\"(\"ID\",\"DATA\") values ('1','test');");
                Mockito.when(Integer.valueOf(resultSet.getInt(3))).thenReturn(Integer.valueOf(EventType.INSERT.getValue()));
                Mockito.when(resultSet.getString(7)).thenReturn("ABC");
                Mockito.when(resultSet.getString(8)).thenReturn("DEBEZIUM");
                PreparedStatement preparedStatement = (PreparedStatement) Mockito.mock(PreparedStatement.class);
                Mockito.when(processor.createQueryStatement()).thenReturn(preparedStatement);
                Mockito.when(preparedStatement.executeQuery()).thenReturn(resultSet);
                AbstractLogMinerEventProcessor abstractLogMinerEventProcessor = (AbstractLogMinerEventProcessor) Mockito.spy(processor);
                ((AbstractLogMinerEventProcessor) Mockito.doReturn("CREATE TABLE DEBEZIUM.ABC (ID primary key(9,0), data varchar2(50))").when(abstractLogMinerEventProcessor)).getTableMetadataDdl((OracleConnection) Mockito.any(OracleConnection.class), (TableId) Mockito.any(TableId.class));
                ((AbstractLogMinerEventProcessor) Mockito.doReturn(Table.editor().tableId(TableId.parse(TestHelper.getDatabaseName() + ".DEBEZIUM.ABC")).addColumn(Column.editor().name("ID").create()).addColumn(Column.editor().name("DATA").create()).setPrimaryKeyNames(new String[]{"ID"}).create()).when(abstractLogMinerEventProcessor)).dispatchSchemaChangeEventAndGetTableForNewCapturedTable((TableId) Mockito.any(TableId.class), (OracleOffsetContext) Mockito.any(OracleOffsetContext.class), (EventDispatcher) Mockito.any(EventDispatcher.class));
                Assertions.assertThat(abstractLogMinerEventProcessor.process(Scn.valueOf(100), Scn.valueOf(200))).isEqualTo(Scn.valueOf(101));
                if (processor != null) {
                    processor.close();
                }
            } catch (Throwable th) {
                if (processor != null) {
                    try {
                        processor.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    @Test
    public void testAbandonOneTransaction() throws Exception {
        if (isTransactionAbandonmentSupported()) {
            T processor = getProcessor(new OracleConnectorConfig(getConfig().build()));
            try {
                Mockito.when(this.offsetContext.getScn()).thenReturn(Scn.valueOf(1L));
                Mockito.when(this.offsetContext.getSnapshotScn()).thenReturn(Scn.NULL);
                Instant minus = Instant.now().minus(24L, (TemporalUnit) ChronoUnit.HOURS);
                processor.processRow(this.partition, getStartLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_1, minus));
                processor.processRow(this.partition, getInsertLogMinerEventRow(Scn.valueOf(3L), TRANSACTION_ID_1, minus));
                processor.abandonTransactions(Duration.ofHours(1L));
                Assertions.assertThat(processor.getTransactionCache().isEmpty()).isTrue();
                if (processor != null) {
                    processor.close();
                }
            } catch (Throwable th) {
                if (processor != null) {
                    try {
                        processor.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    @Test
    @FixFor({"DBZ-7473"})
    public void testAbandonOneTransactionAndStartEventIsNotHandled() throws Exception {
        if (isTransactionAbandonmentSupported()) {
            T processor = getProcessor(new OracleConnectorConfig(getConfig().build()));
            try {
                Mockito.when(this.offsetContext.getScn()).thenReturn(Scn.valueOf(1L));
                Mockito.when(this.offsetContext.getSnapshotScn()).thenReturn(Scn.NULL);
                processor.processRow(this.partition, getInsertLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_1, Instant.now().minus(24L, (TemporalUnit) ChronoUnit.HOURS)));
                processor.abandonTransactions(Duration.ofHours(1L));
                Assertions.assertThat(processor.getTransactionCache().isEmpty()).isTrue();
                if (processor != null) {
                    processor.close();
                }
            } catch (Throwable th) {
                if (processor != null) {
                    try {
                        processor.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    @Test
    public void testAbandonTransactionHavingAnotherOne() throws Exception {
        if (isTransactionAbandonmentSupported()) {
            T processor = getProcessor(new OracleConnectorConfig(getConfig().build()));
            try {
                Mockito.when(this.offsetContext.getScn()).thenReturn(Scn.valueOf(1L));
                Mockito.when(this.offsetContext.getSnapshotScn()).thenReturn(Scn.NULL);
                Instant minus = Instant.now().minus(24L, (TemporalUnit) ChronoUnit.HOURS);
                processor.processRow(this.partition, getStartLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_1, minus));
                processor.processRow(this.partition, getInsertLogMinerEventRow(Scn.valueOf(3L), TRANSACTION_ID_1, minus));
                processor.processRow(this.partition, getStartLogMinerEventRow(Scn.valueOf(4L), TRANSACTION_ID_2));
                processor.processRow(this.partition, getInsertLogMinerEventRow(Scn.valueOf(5L), TRANSACTION_ID_2));
                processor.abandonTransactions(Duration.ofHours(1L));
                Assertions.assertThat(processor.getTransactionCache().isEmpty()).isFalse();
                Assertions.assertThat(processor.getTransactionCache().get(TRANSACTION_ID_1)).isNull();
                Assertions.assertThat(processor.getTransactionCache().get(TRANSACTION_ID_2)).isNotNull();
                if (processor != null) {
                    processor.close();
                }
            } catch (Throwable th) {
                if (processor != null) {
                    try {
                        processor.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    @Test
    @FixFor({"DBZ-7473"})
    public void testAbandonTransactionHavingAnotherOneAndStartEventIsNotHandled() throws Exception {
        if (isTransactionAbandonmentSupported()) {
            T processor = getProcessor(new OracleConnectorConfig(getConfig().build()));
            try {
                Mockito.when(this.offsetContext.getScn()).thenReturn(Scn.valueOf(1L));
                Mockito.when(this.offsetContext.getSnapshotScn()).thenReturn(Scn.NULL);
                processor.processRow(this.partition, getInsertLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_1, Instant.now().minus(24L, (TemporalUnit) ChronoUnit.HOURS)));
                processor.processRow(this.partition, getInsertLogMinerEventRow(Scn.valueOf(3L), TRANSACTION_ID_2));
                processor.abandonTransactions(Duration.ofHours(1L));
                Assertions.assertThat(processor.getTransactionCache().isEmpty()).isFalse();
                Assertions.assertThat(processor.getTransactionCache().get(TRANSACTION_ID_1)).isNull();
                Assertions.assertThat(processor.getTransactionCache().get(TRANSACTION_ID_2)).isNotNull();
                if (processor != null) {
                    processor.close();
                }
            } catch (Throwable th) {
                if (processor != null) {
                    try {
                        processor.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    @Test
    @FixFor({"DBZ-6355"})
    public void testAbandonTransactionsUsingFallbackBasedOnChangeTime() throws Exception {
        if (isTransactionAbandonmentSupported()) {
            this.schema.close();
            this.connection = createOracleConnection(true);
            this.schema = createOracleDatabaseSchema();
            this.metrics = createMetrics(this.schema);
            T processor = getProcessor(new OracleConnectorConfig(getConfig().build()));
            try {
                Mockito.when(this.offsetContext.getScn()).thenReturn(Scn.valueOf(1L));
                Mockito.when(this.offsetContext.getSnapshotScn()).thenReturn(Scn.NULL);
                Instant minus = Instant.now().minus(24L, (TemporalUnit) ChronoUnit.HOURS);
                Instant minus2 = Instant.now().minus(23L, (TemporalUnit) ChronoUnit.HOURS);
                processor.processRow(this.partition, getStartLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_1, minus));
                processor.processRow(this.partition, getInsertLogMinerEventRow(Scn.valueOf(3L), TRANSACTION_ID_1, minus));
                processor.processRow(this.partition, getStartLogMinerEventRow(Scn.valueOf(4L), TRANSACTION_ID_2, minus2));
                processor.processRow(this.partition, getInsertLogMinerEventRow(Scn.valueOf(5L), TRANSACTION_ID_2, minus2));
                processor.processRow(this.partition, getStartLogMinerEventRow(Scn.valueOf(6L), TRANSACTION_ID_3));
                processor.processRow(this.partition, getInsertLogMinerEventRow(Scn.valueOf(7L), TRANSACTION_ID_3));
                processor.abandonTransactions(Duration.ofHours(1L));
                Assertions.assertThat(processor.getTransactionCache().isEmpty()).isFalse();
                Assertions.assertThat(processor.getTransactionCache().get(TRANSACTION_ID_1)).isNull();
                Assertions.assertThat(processor.getTransactionCache().get(TRANSACTION_ID_2)).isNull();
                Assertions.assertThat(processor.getTransactionCache().get(TRANSACTION_ID_3)).isNotNull();
                if (processor != null) {
                    processor.close();
                }
            } catch (Throwable th) {
                if (processor != null) {
                    try {
                        processor.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    @Test
    @FixFor({"DBZ-7473"})
    public void testAbandonTransactionsUsingFallbackBasedOnChangeTimeAndStartEventIsNotHandled() throws Exception {
        if (isTransactionAbandonmentSupported()) {
            this.schema.close();
            this.connection = createOracleConnection(true);
            this.schema = createOracleDatabaseSchema();
            this.metrics = createMetrics(this.schema);
            T processor = getProcessor(new OracleConnectorConfig(getConfig().build()));
            try {
                Mockito.when(this.offsetContext.getScn()).thenReturn(Scn.valueOf(1L));
                Mockito.when(this.offsetContext.getSnapshotScn()).thenReturn(Scn.NULL);
                Instant minus = Instant.now().minus(24L, (TemporalUnit) ChronoUnit.HOURS);
                Instant minus2 = Instant.now().minus(23L, (TemporalUnit) ChronoUnit.HOURS);
                processor.processRow(this.partition, getInsertLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_1, minus));
                processor.processRow(this.partition, getInsertLogMinerEventRow(Scn.valueOf(3L), TRANSACTION_ID_2, minus2));
                processor.processRow(this.partition, getInsertLogMinerEventRow(Scn.valueOf(4L), TRANSACTION_ID_3));
                processor.abandonTransactions(Duration.ofHours(1L));
                Assertions.assertThat(processor.getTransactionCache().isEmpty()).isFalse();
                Assertions.assertThat(processor.getTransactionCache().get(TRANSACTION_ID_1)).isNull();
                Assertions.assertThat(processor.getTransactionCache().get(TRANSACTION_ID_2)).isNull();
                Assertions.assertThat(processor.getTransactionCache().get(TRANSACTION_ID_3)).isNotNull();
                if (processor != null) {
                    processor.close();
                }
            } catch (Throwable th) {
                if (processor != null) {
                    try {
                        processor.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    private OracleDatabaseSchema createOracleDatabaseSchema() throws Exception {
        OracleConnectorConfig oracleConnectorConfig = new OracleConnectorConfig(getConfig().build());
        SchemaTopicNamingStrategy create = SchemaTopicNamingStrategy.create(oracleConnectorConfig);
        SchemaNameAdjuster schemaNameAdjuster = oracleConnectorConfig.schemaNameAdjuster();
        OracleValueConverters valueConverter = oracleConnectorConfig.getAdapter().getValueConverter(oracleConnectorConfig, this.connection);
        OracleDatabaseSchema oracleDatabaseSchema = new OracleDatabaseSchema(oracleConnectorConfig, valueConverter, new OracleDefaultValueConverter(valueConverter, this.connection), schemaNameAdjuster, create, oracleConnectorConfig.getAdapter().getTableNameCaseSensitivity(this.connection), false);
        oracleDatabaseSchema.refresh(Table.editor().tableId(TableId.parse("ORCLPDB1.DEBEZIUM.TEST_TABLE")).addColumn(Column.editor().name("ID").create()).addColumn(Column.editor().name("DATA").create()).create());
        return oracleDatabaseSchema;
    }

    private OracleConnection createOracleConnection(boolean z) throws Exception {
        ResultSet resultSet = (ResultSet) Mockito.mock(ResultSet.class);
        Mockito.when(Boolean.valueOf(resultSet.next())).thenReturn(true);
        Mockito.when(Float.valueOf(resultSet.getFloat(1))).thenReturn(Float.valueOf(2.0f));
        PreparedStatement preparedStatement = (PreparedStatement) Mockito.mock(PreparedStatement.class);
        Mockito.when(preparedStatement.executeQuery()).thenReturn(resultSet);
        Connection connection = (Connection) Mockito.mock(Connection.class);
        Mockito.when(connection.prepareStatement((String) Mockito.any())).thenReturn(preparedStatement);
        OracleConnection oracleConnection = (OracleConnection) Mockito.mock(OracleConnection.class);
        Mockito.when(oracleConnection.connection(Mockito.anyBoolean())).thenReturn(connection);
        Mockito.when(oracleConnection.connection()).thenReturn(connection);
        Mockito.when(oracleConnection.getNationalCharacterSet()).thenReturn(CharacterSet.make(871));
        if (z) {
            Mockito.when(oracleConnection.singleOptionalValue(ArgumentMatchers.anyString(), (JdbcConnection.ResultSetExtractor) ArgumentMatchers.any())).thenThrow(new Throwable[]{new SQLException("ORA-01555 Snapshot too old", (String) null, 1555)});
        } else {
            Mockito.when(oracleConnection.singleOptionalValue(ArgumentMatchers.anyString(), (JdbcConnection.ResultSetExtractor) ArgumentMatchers.any())).thenReturn(BigInteger.TWO);
        }
        return oracleConnection;
    }

    private LogMinerStreamingChangeEventSourceMetrics createMetrics(OracleDatabaseSchema oracleDatabaseSchema) throws Exception {
        OracleConnectorConfig oracleConnectorConfig = new OracleConnectorConfig(getConfig().build());
        return new LogMinerStreamingChangeEventSourceMetrics(new OracleTaskContext(oracleConnectorConfig, oracleDatabaseSchema), new ChangeEventQueue.Builder().pollInterval(Duration.of(8192L, ChronoUnit.MILLIS)).maxBatchSize(2048).maxQueueSize(8192).build(), (EventMetadataProvider) null, oracleConnectorConfig);
    }

    private LogMinerEventRow getStartLogMinerEventRow(Scn scn, String str) {
        return getStartLogMinerEventRow(scn, str, Instant.now());
    }

    private LogMinerEventRow getStartLogMinerEventRow(Scn scn, String str, Instant instant) {
        LogMinerEventRow logMinerEventRow = (LogMinerEventRow) Mockito.mock(LogMinerEventRow.class);
        Mockito.when(logMinerEventRow.getEventType()).thenReturn(EventType.START);
        Mockito.when(logMinerEventRow.getTransactionId()).thenReturn(str);
        Mockito.when(logMinerEventRow.getScn()).thenReturn(scn);
        Mockito.when(logMinerEventRow.getChangeTime()).thenReturn(instant);
        return logMinerEventRow;
    }

    private LogMinerEventRow getCommitLogMinerEventRow(Scn scn, String str) {
        LogMinerEventRow logMinerEventRow = (LogMinerEventRow) Mockito.mock(LogMinerEventRow.class);
        Mockito.when(logMinerEventRow.getEventType()).thenReturn(EventType.COMMIT);
        Mockito.when(logMinerEventRow.getTransactionId()).thenReturn(str);
        Mockito.when(logMinerEventRow.getScn()).thenReturn(scn);
        Mockito.when(logMinerEventRow.getChangeTime()).thenReturn(Instant.now());
        return logMinerEventRow;
    }

    private LogMinerEventRow getRollbackLogMinerEventRow(Scn scn, String str) {
        LogMinerEventRow logMinerEventRow = (LogMinerEventRow) Mockito.mock(LogMinerEventRow.class);
        Mockito.when(logMinerEventRow.getEventType()).thenReturn(EventType.ROLLBACK);
        Mockito.when(logMinerEventRow.getTransactionId()).thenReturn(str);
        Mockito.when(logMinerEventRow.getScn()).thenReturn(scn);
        Mockito.when(logMinerEventRow.getChangeTime()).thenReturn(Instant.now());
        return logMinerEventRow;
    }

    private LogMinerEventRow getInsertLogMinerEventRow(Scn scn, String str) {
        return getInsertLogMinerEventRow(scn, str, Instant.now());
    }

    private LogMinerEventRow getInsertLogMinerEventRow(Scn scn, String str, Instant instant) {
        LogMinerEventRow logMinerEventRow = (LogMinerEventRow) Mockito.mock(LogMinerEventRow.class);
        Mockito.when(logMinerEventRow.getEventType()).thenReturn(EventType.INSERT);
        Mockito.when(logMinerEventRow.getTransactionId()).thenReturn(str);
        Mockito.when(logMinerEventRow.getScn()).thenReturn(scn);
        Mockito.when(logMinerEventRow.getChangeTime()).thenReturn(instant);
        Mockito.when(logMinerEventRow.getRowId()).thenReturn(TRANSACTION_ID_1);
        Mockito.when(logMinerEventRow.getOperation()).thenReturn("INSERT");
        Mockito.when(logMinerEventRow.getTableName()).thenReturn("TEST_TABLE");
        Mockito.when(logMinerEventRow.getTableId()).thenReturn(TableId.parse("ORCLPDB1.DEBEZIUM.TEST_TABLE"));
        Mockito.when(logMinerEventRow.getRedoSql()).thenReturn("insert into \"DEBEZIUM\".\"TEST_TABLE\"(\"ID\",\"DATA\") values ('1','Test');");
        Mockito.when(logMinerEventRow.getRsId()).thenReturn("A.B.C");
        Mockito.when(logMinerEventRow.getTablespaceName()).thenReturn("DEBEZIUM");
        Mockito.when(logMinerEventRow.getUserName()).thenReturn(TestHelper.SCHEMA_USER);
        return logMinerEventRow;
    }
}
