package io.debezium.connector.oracle.xstream;

import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.SourceInfo;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import oracle.sql.NUMBER;
import oracle.streams.StreamsException;
import oracle.streams.XStreamOut;
import oracle.streams.XStreamUtility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/oracle/xstream/XstreamStreamingChangeEventSource.class */
public class XstreamStreamingChangeEventSource implements StreamingChangeEventSource {
    private static final Logger LOGGER = LoggerFactory.getLogger(XstreamStreamingChangeEventSource.class);
    private final OracleConnection jdbcConnection;
    private final EventDispatcher<TableId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final OracleDatabaseSchema schema;
    private final OracleOffsetContext offsetContext;
    private final String xStreamServerName;
    private volatile XStreamOut xsOut;
    private final boolean tablenameCaseInsensitive;
    private final int posVersion;
    private final AtomicReference<PositionAndScn> lcrMessage = new AtomicReference<>();

    /* loaded from: input_file:io/debezium/connector/oracle/xstream/XstreamStreamingChangeEventSource$PositionAndScn.class */
    public static class PositionAndScn {
        public final LcrPosition position;
        public final byte[] scn;

        public PositionAndScn(LcrPosition lcrPosition, byte[] bArr) {
            this.position = lcrPosition;
            this.scn = bArr;
        }
    }

    public XstreamStreamingChangeEventSource(OracleConnectorConfig oracleConnectorConfig, OracleOffsetContext oracleOffsetContext, OracleConnection oracleConnection, EventDispatcher<TableId> eventDispatcher, ErrorHandler errorHandler, Clock clock, OracleDatabaseSchema oracleDatabaseSchema) {
        this.jdbcConnection = oracleConnection;
        this.dispatcher = eventDispatcher;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.schema = oracleDatabaseSchema;
        this.offsetContext = oracleOffsetContext;
        this.xStreamServerName = oracleConnectorConfig.getXoutServerName();
        this.tablenameCaseInsensitive = oracleConnection.getTablenameCaseInsensitivity(oracleConnectorConfig);
        this.posVersion = oracleConnectorConfig.getOracleVersion().getPosVersion();
    }

    public void execute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext) throws InterruptedException {
        try {
            try {
                this.xsOut = XStreamOut.attach(this.jdbcConnection.connection(), this.xStreamServerName, this.offsetContext.getLcrPosition() != null ? this.offsetContext.getLcrPosition().getRawPosition() : convertScnToPosition(this.offsetContext.getScn()), 1, 1, 0);
                LcrEventHandler lcrEventHandler = new LcrEventHandler(this.errorHandler, this.dispatcher, this.clock, this.schema, this.offsetContext, this.tablenameCaseInsensitive, this);
                while (changeEventSourceContext.isRunning()) {
                    LOGGER.trace("Receiving LCR");
                    this.xsOut.receiveLCRCallback(lcrEventHandler, 0);
                }
                if (this.xsOut != null) {
                    try {
                        XStreamOut xStreamOut = this.xsOut;
                        this.xsOut = null;
                        xStreamOut.detach(0);
                    } catch (StreamsException e) {
                        LOGGER.error("Couldn't detach from XStream outbound server " + this.xStreamServerName, e);
                    }
                }
            } catch (Throwable th) {
                if (this.xsOut != null) {
                    try {
                        XStreamOut xStreamOut2 = this.xsOut;
                        this.xsOut = null;
                        xStreamOut2.detach(0);
                    } catch (StreamsException e2) {
                        LOGGER.error("Couldn't detach from XStream outbound server " + this.xStreamServerName, e2);
                    }
                }
                throw th;
            }
        } catch (Throwable th2) {
            this.errorHandler.setProducerThrowable(th2);
            if (this.xsOut != null) {
                try {
                    XStreamOut xStreamOut3 = this.xsOut;
                    this.xsOut = null;
                    xStreamOut3.detach(0);
                } catch (StreamsException e3) {
                    LOGGER.error("Couldn't detach from XStream outbound server " + this.xStreamServerName, e3);
                }
            }
        }
    }

    public void commitOffset(Map<String, ?> map) {
        if (this.xsOut != null) {
            LOGGER.debug("Sending message to request recording of offsets to Oracle");
            sendPublishedPosition(LcrPosition.valueOf((String) map.get(SourceInfo.LCR_POSITION_KEY)), (Long) map.get(SourceInfo.SCN_KEY));
        }
    }

    private byte[] convertScnToPosition(long j) {
        try {
            return XStreamUtility.convertSCNToPosition(new NUMBER(j), this.posVersion);
        } catch (StreamsException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public XStreamOut getXsOut() {
        return this.xsOut;
    }

    private void sendPublishedPosition(LcrPosition lcrPosition, Long l) {
        this.lcrMessage.set(new PositionAndScn(lcrPosition, l != null ? convertScnToPosition(l.longValue()) : null));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PositionAndScn receivePublishedPosition() {
        return this.lcrMessage.getAndSet(null);
    }
}
