package io.debezium.connector.oracle.xstream;

import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleDatabaseVersion;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.StreamingAdapter;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import oracle.sql.NUMBER;
import oracle.streams.StreamsException;
import oracle.streams.XStreamOut;
import oracle.streams.XStreamUtility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/oracle/xstream/XstreamStreamingChangeEventSource.class */
public class XstreamStreamingChangeEventSource implements StreamingChangeEventSource<OraclePartition, OracleOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(XstreamStreamingChangeEventSource.class);
    private static final int DEFAULT_MAX_ATTACH_RETRIES = 10;
    private static final int DEFAULT_MAX_ATTACH_RETRY_DELAY_SECONDS = 10;
    private final OracleConnectorConfig connectorConfig;
    private final OracleConnection jdbcConnection;
    private final EventDispatcher<OraclePartition, TableId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final OracleDatabaseSchema schema;
    private final XStreamStreamingChangeEventSourceMetrics streamingMetrics;
    private final String xStreamServerName;
    private volatile XStreamOut xsOut;
    private final int posVersion;
    private final AtomicReference<PositionAndScn> lcrMessage = new AtomicReference<>();
    private OracleOffsetContext effectiveOffset;

    /* loaded from: input_file:io/debezium/connector/oracle/xstream/XstreamStreamingChangeEventSource$PositionAndScn.class */
    public static class PositionAndScn {
        public final LcrPosition position;
        public final byte[] scn;

        public PositionAndScn(LcrPosition lcrPosition, byte[] bArr) {
            this.position = lcrPosition;
            this.scn = bArr;
        }
    }

    public XstreamStreamingChangeEventSource(OracleConnectorConfig oracleConnectorConfig, OracleConnection oracleConnection, EventDispatcher<OraclePartition, TableId> eventDispatcher, ErrorHandler errorHandler, Clock clock, OracleDatabaseSchema oracleDatabaseSchema, XStreamStreamingChangeEventSourceMetrics xStreamStreamingChangeEventSourceMetrics) {
        this.connectorConfig = oracleConnectorConfig;
        this.jdbcConnection = oracleConnection;
        this.dispatcher = eventDispatcher;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.schema = oracleDatabaseSchema;
        this.streamingMetrics = xStreamStreamingChangeEventSourceMetrics;
        this.xStreamServerName = oracleConnectorConfig.getXoutServerName();
        this.posVersion = resolvePosVersion(oracleConnection, oracleConnectorConfig);
    }

    public void init(OracleOffsetContext oracleOffsetContext) throws InterruptedException {
        this.effectiveOffset = oracleOffsetContext == null ? emptyContext() : oracleOffsetContext;
    }

    private OracleOffsetContext emptyContext() {
        return OracleOffsetContext.create().logicalName(this.connectorConfig).snapshotPendingTransactions(Collections.emptyMap()).transactionContext(new TransactionContext()).incrementalSnapshotContext(new SignalBasedIncrementalSnapshotContext()).build();
    }

    /* JADX WARN: Failed to calculate best type for var: r18v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r18v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Finally extract failed */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.applyWithWiderIgnSame(TypeUpdate.java:70)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.applyResolvedVars(TypeSearch.java:100)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:76)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 18, insn: 0x0152: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r18 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_ENTER], block:B:49:0x0152 */
    public void execute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, OraclePartition oraclePartition, OracleOffsetContext oracleOffsetContext) throws InterruptedException {
        this.effectiveOffset = oracleOffsetContext;
        LcrEventHandler lcrEventHandler = new LcrEventHandler(this.connectorConfig, this.errorHandler, this.dispatcher, this.clock, this.schema, oraclePartition, oracleOffsetContext, StreamingAdapter.TableNameCaseSensitivity.INSENSITIVE.equals(this.connectorConfig.getAdapter().getTableNameCaseSensitivity(this.jdbcConnection)), this, this.streamingMetrics);
        try {
            try {
                OracleConnection oracleConnection = new OracleConnection(this.jdbcConnection.config());
                try {
                    String lcrPosition = oracleOffsetContext.getLcrPosition();
                    this.xsOut = performAttachWithRetries(oracleConnection, lcrPosition != null ? LcrPosition.valueOf(lcrPosition).getRawPosition() : convertScnToPosition(oracleOffsetContext.getScn()));
                    if (this.xsOut == null) {
                        throw new DebeziumException("Failed to attach to the Oracle XStream outbound server");
                    }
                    while (changeEventSourceContext.isRunning()) {
                        LOGGER.trace("Receiving LCR");
                        this.xsOut.receiveLCRCallback(lcrEventHandler, 0);
                        this.dispatcher.dispatchHeartbeatEvent(oraclePartition, oracleOffsetContext);
                        if (changeEventSourceContext.isPaused()) {
                            LOGGER.info("Streaming will now pause");
                            changeEventSourceContext.streamingPaused();
                            changeEventSourceContext.waitSnapshotCompletion();
                            LOGGER.info("Streaming resumed");
                        }
                    }
                    if (this.xsOut != null) {
                        try {
                            XStreamOut xStreamOut = this.xsOut;
                            this.xsOut = null;
                            xStreamOut.detach(0);
                        } catch (StreamsException e) {
                            LOGGER.error("Couldn't detach from XStream outbound server " + this.xStreamServerName, e);
                        }
                    }
                    oracleConnection.close();
                } catch (Throwable th) {
                    if (this.xsOut != null) {
                        try {
                            XStreamOut xStreamOut2 = this.xsOut;
                            this.xsOut = null;
                            xStreamOut2.detach(0);
                        } catch (StreamsException e2) {
                            LOGGER.error("Couldn't detach from XStream outbound server " + this.xStreamServerName, e2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th2) {
            this.errorHandler.setProducerThrowable(th2);
        }
    }

    public void commitOffset(Map<String, ?> map, Map<String, ?> map2) {
        if (this.xsOut != null) {
            LOGGER.debug("Sending message to request recording of offsets to Oracle");
            sendPublishedPosition(LcrPosition.valueOf((String) map2.get("lcr_position")), OracleOffsetContext.getScnFromOffsetMapByKey(map2, "scn"));
        }
    }

    /* renamed from: getOffsetContext, reason: merged with bridge method [inline-methods] */
    public OracleOffsetContext m441getOffsetContext() {
        return this.effectiveOffset;
    }

    private XStreamOut performAttachWithRetries(OracleConnection oracleConnection, byte[] bArr) throws Exception {
        XStreamOut xStreamOut = null;
        for (int i = 1; i <= 10; i++) {
            try {
                xStreamOut = XStreamOut.attach(oracleConnection.connection(), this.xStreamServerName, bArr, 1, 1, 0);
                break;
            } catch (StreamsException e) {
                if (!isAttachExceptionRetriable(e) || i == 10) {
                    if (i == 10) {
                        LOGGER.warn("Failed to attach to outbound server with max attempts", e);
                    }
                    throw e;
                }
                LOGGER.warn("Failed to attach to outbound server, retrying: {}", e.getMessage());
            }
        }
        return xStreamOut;
    }

    private boolean isAttachExceptionRetriable(StreamsException streamsException) {
        return streamsException.getErrorCode() == 26653 || streamsException.getErrorCode() == 23656 || streamsException.getMessage().contains("did not start properly and is currently in state") || streamsException.getMessage().contains("Timeout occurred while starting XStream process");
    }

    private byte[] convertScnToPosition(Scn scn) {
        try {
            return XStreamUtility.convertSCNToPosition(new NUMBER(scn.toString(), 0), this.posVersion);
        } catch (SQLException | StreamsException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public XStreamOut getXsOut() {
        return this.xsOut;
    }

    private void sendPublishedPosition(LcrPosition lcrPosition, Scn scn) {
        this.lcrMessage.set(new PositionAndScn(lcrPosition, scn != null ? convertScnToPosition(scn) : null));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PositionAndScn receivePublishedPosition() {
        return this.lcrMessage.getAndSet(null);
    }

    private static int resolvePosVersion(OracleConnection oracleConnection, OracleConnectorConfig oracleConnectorConfig) {
        OracleDatabaseVersion oracleVersion = oracleConnection.getOracleVersion();
        if (oracleVersion.getMajor() != 11) {
            return (oracleVersion.getMajor() != 12 || oracleVersion.getMaintenance() >= 2) ? 2 : 1;
        }
        return 1;
    }
}
