package solutions.a2.cdc.oracle;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.NClob;
import java.sql.PreparedStatement;
import java.sql.RowId;
import java.sql.SQLException;
import java.sql.SQLXML;
import java.sql.Statement;
import java.util.Comparator;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.core.io.IORuntimeException;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.wire.ReadMarshallable;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.WriteMarshallable;
import oracle.jdbc.OracleResultSet;
import oracle.sql.BINARY_DOUBLE;
import oracle.sql.BINARY_FLOAT;
import oracle.sql.NUMBER;
import oracle.sql.TIMESTAMP;
import oracle.sql.TIMESTAMPLTZ;
import oracle.sql.TIMESTAMPTZ;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import solutions.a2.cdc.oracle.data.OraTimestamp;
import solutions.a2.cdc.oracle.jmx.OraCdcInitialLoad;
import solutions.a2.cdc.oracle.utils.Lz4Util;
import solutions.a2.kafka.sink.JdbcSinkConnectionPool;
import solutions.a2.utils.ExceptionUtils;

/* loaded from: input_file:solutions/a2/cdc/oracle/OraTable4InitialLoad.class */
public class OraTable4InitialLoad extends OraTable4SourceConnector implements ReadMarshallable, WriteMarshallable {
    private static final Logger LOGGER = LoggerFactory.getLogger(OraTable4InitialLoad.class);
    private static final byte NULL_LENGTH_BYTE = -1;
    private static final short NULL_LENGTH_SHORT = -1;
    private static final int NULL_LENGTH_INT = -1;
    private static final int LOB_CHUNK_SIZE = 16384;
    private static final int ORA_942 = 942;
    private final String pdbName;
    private final Path queueDirectory;
    private final OraCdcInitialLoad metrics;
    private final String sqlSelect;
    private final String tableFqn;
    private final String kafkaTopic;
    private ChronicleQueue tableRows;
    private ExcerptAppender appender;
    private ExcerptTailer tailer;
    private int queueSize;
    private int tailerOffset;
    private OracleResultSet rsMaster;
    private Struct keyStruct;
    private Struct valueStruct;
    private Connection connTzData;

    public OraTable4InitialLoad(Path path, OraTable4LogMiner oraTable4LogMiner, OraCdcInitialLoad oraCdcInitialLoad, OraRdbmsInfo oraRdbmsInfo) throws IOException {
        super(oraTable4LogMiner.getTableOwner(), oraTable4LogMiner.getTableName(), oraTable4LogMiner.getSchemaType());
        LOGGER.trace("BEGIN: create OraCdcTableBuffer");
        this.pdbName = oraTable4LogMiner.getPdbName();
        this.allColumns = oraTable4LogMiner.getAllColumns();
        this.pkColumns = oraTable4LogMiner.getPkColumns();
        this.schema = oraTable4LogMiner.schema;
        this.keySchema = oraTable4LogMiner.keySchema;
        this.valueSchema = oraTable4LogMiner.valueSchema;
        this.sourcePartition = oraTable4LogMiner.sourcePartition;
        this.metrics = oraCdcInitialLoad;
        this.tableFqn = oraTable4LogMiner.fqn();
        this.kafkaTopic = oraTable4LogMiner.getKafkaTopic();
        this.rdbmsInfo = oraRdbmsInfo;
        setRowLevelScn(oraTable4LogMiner.isRowLevelScn());
        StringBuilder sb = new StringBuilder(512);
        sb.append("select ");
        for (int i = 0; i < this.allColumns.size(); i++) {
            OraColumn oraColumn = this.allColumns.get(i);
            if (oraColumn.getColumnName().equals(OraColumn.ROWID_KEY)) {
                sb.append("ROWID as ");
                sb.append(OraColumn.ROWID_KEY);
            } else {
                sb.append(oraColumn.getColumnName());
            }
            if (i < this.allColumns.size() - 1) {
                sb.append(",");
            }
        }
        sb.append(" from ");
        sb.append(this.tableOwner);
        sb.append(".");
        sb.append(this.tableName);
        if (isRowLevelScn()) {
            sb.append(" where ORA_ROWSCN < ?");
        }
        this.sqlSelect = sb.toString();
        LOGGER.debug("{} will be used for initial data load.", this.sqlSelect);
        this.queueDirectory = Files.createTempDirectory(path, StringUtils.replace(this.tableFqn, ":", "-") + ".", new FileAttribute[0]);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Created queue directory {} .", this.queueDirectory.toString());
        }
        try {
            this.tableRows = ChronicleQueue.singleBuilder(this.queueDirectory).build();
            this.tailer = this.tableRows.createTailer();
            this.appender = this.tableRows.acquireAppender();
            this.queueSize = 0;
            this.tailerOffset = 0;
            LOGGER.trace("END: create OraCdcTableBuffer");
        } catch (Exception e) {
            LOGGER.error("Unable to create Chronicle Queue!");
            LOGGER.error(ExceptionUtils.getExceptionStackTrace(e));
            throw new IOException(e);
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:7:0x0031. Please report as an issue. */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v119, types: [java.sql.Clob] */
    public void writeMarshallable(WireOut wireOut) {
        Bytes bytes = wireOut.bytes();
        for (int i = 0; i < this.allColumns.size(); i++) {
            try {
                OraColumn oraColumn = this.allColumns.get(i);
                String columnName = oraColumn.getColumnName();
                switch (oraColumn.getJdbcType()) {
                    case -15:
                    case -9:
                        bytes.writeUtf8(this.rsMaster.getNString(columnName));
                    case -8:
                        RowId rowId = this.rsMaster.getRowId(columnName);
                        bytes.write8bit(rowId == null ? (String) null : rowId.toString());
                    case -6:
                    case -5:
                    case 2:
                    case 3:
                    case JdbcSinkConnectionPool.DB_TYPE_MSSQL /* 4 */:
                    case OraCdcV$LogmnrContents.DDL /* 5 */:
                        NUMBER number = this.rsMaster.getNUMBER(columnName);
                        if (this.rsMaster.wasNull()) {
                            bytes.writeByte((byte) -1);
                        } else {
                            byte[] bytes2 = number.getBytes();
                            bytes.writeByte((byte) bytes2.length);
                            bytes.write(bytes2);
                        }
                    case -2:
                        byte[] bytes3 = this.rsMaster.getBytes(columnName);
                        if (bytes3 == null) {
                            bytes.writeShort((short) -1);
                        } else {
                            bytes.writeShort((short) bytes3.length);
                            bytes.write(bytes3);
                        }
                    case 1:
                    case OraRdbmsInfo.CDB_INTRODUCED /* 12 */:
                        bytes.writeUtf8(this.rsMaster.getString(columnName));
                    case OraCdcV$LogmnrContents.START /* 6 */:
                        if (oraColumn.isBinaryFloatDouble().booleanValue()) {
                            r17 = this.rsMaster.wasNull() ? null : new BINARY_FLOAT(this.rsMaster.getFloat(columnName)).getBytes();
                        } else {
                            NUMBER number2 = this.rsMaster.getNUMBER(columnName);
                            if (!this.rsMaster.wasNull()) {
                                r17 = number2.getBytes();
                            }
                        }
                        if (r17 == null) {
                            bytes.writeByte((byte) -1);
                        } else {
                            bytes.writeByte((byte) r17.length);
                            bytes.write(r17);
                        }
                    case 8:
                        if (oraColumn.isBinaryFloatDouble().booleanValue()) {
                            r18 = this.rsMaster.wasNull() ? null : new BINARY_DOUBLE(this.rsMaster.getDouble(columnName)).getBytes();
                        } else {
                            NUMBER number3 = this.rsMaster.getNUMBER(columnName);
                            if (!this.rsMaster.wasNull()) {
                                r18 = number3.getBytes();
                            }
                        }
                        if (r18 == null) {
                            bytes.writeByte((byte) -1);
                        } else {
                            bytes.writeByte((byte) r18.length);
                            bytes.write(r18);
                        }
                    case 91:
                    case 93:
                        TIMESTAMP timestamp = this.rsMaster.getTIMESTAMP(columnName);
                        if (this.rsMaster.wasNull()) {
                            bytes.writeByte((byte) -1);
                        } else {
                            byte[] bytes4 = timestamp.getBytes();
                            bytes.writeByte((byte) bytes4.length);
                            bytes.write(bytes4);
                        }
                    case 2004:
                        Blob blob = this.rsMaster.getBlob(columnName);
                        if (this.rsMaster.wasNull() || blob.length() < 1) {
                            bytes.writeInt(-1);
                        } else {
                            if (2147483647L < blob.length()) {
                                LOGGER.error("Unable to process BLOB column {}({}) with length ({}) greater than Integer.MAX_VALUE ({})", new Object[]{fqn(), columnName, Long.valueOf(blob.length()), Integer.MAX_VALUE});
                                throw new SQLException("Unable to process BLOB column with length " + blob.length() + " bytes!");
                            }
                            try {
                                InputStream binaryStream = blob.getBinaryStream();
                                try {
                                    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                                    try {
                                        byte[] bArr = new byte[LOB_CHUNK_SIZE];
                                        while (true) {
                                            int read = binaryStream.read(bArr, 0, bArr.length);
                                            if (read != -1) {
                                                byteArrayOutputStream.write(bArr, 0, read);
                                            } else {
                                                bytes.writeInt(byteArrayOutputStream.size());
                                                bytes.write(byteArrayOutputStream.toByteArray());
                                                byteArrayOutputStream.close();
                                                if (binaryStream != null) {
                                                    binaryStream.close();
                                                }
                                            }
                                        }
                                    } finally {
                                    }
                                } finally {
                                }
                            } catch (IOException e) {
                                LOGGER.error("IO Error while processing BLOB column {}({})", fqn(), columnName);
                                LOGGER.error(ExceptionUtils.getExceptionStackTrace(e));
                                throw new ConnectException(e);
                            }
                        }
                        break;
                    case 2005:
                    case 2011:
                        NClob clob = oraColumn.getJdbcType() == 2005 ? this.rsMaster.getClob(columnName) : this.rsMaster.getNClob(columnName);
                        if (this.rsMaster.wasNull() || clob.length() < 1) {
                            bytes.writeInt(-1);
                        } else {
                            if (2147483647L < clob.length()) {
                                Logger logger = LOGGER;
                                Object[] objArr = new Object[5];
                                objArr[0] = oraColumn.getJdbcType() == 2005 ? "CLOB" : "NCLOB";
                                objArr[1] = fqn();
                                objArr[2] = columnName;
                                objArr[3] = Long.valueOf(clob.length());
                                objArr[4] = Integer.MAX_VALUE;
                                logger.error("Unable to process {} column {}({}) with length ({}) greater than Integer.MAX_VALUE ({})", objArr);
                                throw new SQLException("Unable to process " + (oraColumn.getJdbcType() == 2005 ? "CLOB" : "NCLOB") + "column with length " + clob.length() + " chars!");
                            }
                            try {
                                Reader characterStream = clob.getCharacterStream();
                                try {
                                    StringBuilder sb = new StringBuilder((int) clob.length());
                                    char[] cArr = new char[LOB_CHUNK_SIZE];
                                    while (true) {
                                        int read2 = characterStream.read(cArr, 0, cArr.length);
                                        if (read2 != -1) {
                                            sb.append(cArr, 0, read2);
                                        } else {
                                            byte[] compress = Lz4Util.compress(sb.toString());
                                            bytes.writeInt(compress.length);
                                            bytes.write(compress);
                                            if (characterStream != null) {
                                                characterStream.close();
                                            }
                                        }
                                    }
                                } finally {
                                }
                            } catch (IOException e2) {
                                Logger logger2 = LOGGER;
                                Object[] objArr2 = new Object[3];
                                objArr2[0] = oraColumn.getJdbcType() == 2005 ? "CLOB" : "NCLOB";
                                objArr2[1] = fqn();
                                objArr2[2] = columnName;
                                logger2.error("IO Error while processing {} column {}({})", objArr2);
                                LOGGER.error(ExceptionUtils.getExceptionStackTrace(e2));
                                throw new ConnectException(e2);
                            }
                        }
                        break;
                    case 2009:
                        SQLXML sqlxml = this.rsMaster.getSQLXML(columnName);
                        if (this.rsMaster.wasNull()) {
                            bytes.writeInt(-1);
                        } else {
                            String string = sqlxml.getString();
                            if (string.length() < 1) {
                                bytes.writeInt(-1);
                            } else {
                                byte[] compress2 = Lz4Util.compress(string);
                                bytes.writeInt(compress2.length);
                                bytes.write(compress2);
                            }
                        }
                    case 2014:
                        if (oraColumn.isLocalTimeZone().booleanValue()) {
                            TIMESTAMPLTZ timestampltz = this.rsMaster.getTIMESTAMPLTZ(columnName);
                            r15 = timestampltz != null ? timestampltz.getBytes() : null;
                        } else {
                            TIMESTAMPTZ timestamptz = this.rsMaster.getTIMESTAMPTZ(columnName);
                            if (timestamptz != null) {
                                r15 = timestamptz.getBytes();
                            }
                        }
                        if (r15 == null) {
                            bytes.writeByte((byte) -1);
                        } else {
                            bytes.writeByte((byte) r15.length);
                            bytes.write(r15);
                        }
                    default:
                        throw new SQLException("Unsupported JDBC Type " + oraColumn.getJdbcType());
                }
            } catch (SQLException e3) {
                LOGGER.error(ExceptionUtils.getExceptionStackTrace(e3));
                throw new ConnectException(e3);
            }
        }
    }

    public void readMarshallable(WireIn wireIn) throws IORuntimeException {
        Bytes<?> bytes = wireIn.bytes();
        for (int i = 0; i < this.allColumns.size(); i++) {
            try {
                OraColumn oraColumn = this.allColumns.get(i);
                String columnName = oraColumn.getColumnName();
                switch (oraColumn.getJdbcType()) {
                    case -15:
                    case -9:
                    case 1:
                    case OraRdbmsInfo.CDB_INTRODUCED /* 12 */:
                        r13 = bytes.readUtf8();
                        break;
                    case -8:
                        r13 = bytes.read8bit();
                        break;
                    case -6:
                        NUMBER readNUMBER = readNUMBER(bytes);
                        if (readNUMBER != null) {
                            r13 = Byte.valueOf(readNUMBER.byteValue());
                            break;
                        }
                        break;
                    case -5:
                        NUMBER readNUMBER2 = readNUMBER(bytes);
                        if (readNUMBER2 != null) {
                            r13 = Long.valueOf(readNUMBER2.longValue());
                            break;
                        }
                        break;
                    case -2:
                        int readShort = bytes.readShort();
                        if (readShort != -1) {
                            byte[] bArr = new byte[readShort];
                            bytes.read(bArr);
                            r13 = bArr;
                            break;
                        }
                        break;
                    case 2:
                        int readByte = bytes.readByte();
                        if (readByte != -1) {
                            byte[] bArr2 = new byte[readByte];
                            bytes.read(bArr2);
                            r13 = bArr2;
                            break;
                        }
                        break;
                    case 3:
                        NUMBER readNUMBER3 = readNUMBER(bytes);
                        if (readNUMBER3 != null) {
                            r13 = readNUMBER3.bigDecimalValue().setScale(oraColumn.getDataScale().intValue());
                            break;
                        }
                        break;
                    case JdbcSinkConnectionPool.DB_TYPE_MSSQL /* 4 */:
                        NUMBER readNUMBER4 = readNUMBER(bytes);
                        if (readNUMBER4 != null) {
                            r13 = Integer.valueOf(readNUMBER4.intValue());
                            break;
                        }
                        break;
                    case OraCdcV$LogmnrContents.DDL /* 5 */:
                        NUMBER readNUMBER5 = readNUMBER(bytes);
                        if (readNUMBER5 != null) {
                            r13 = Short.valueOf(readNUMBER5.shortValue());
                            break;
                        }
                        break;
                    case OraCdcV$LogmnrContents.START /* 6 */:
                        if (oraColumn.isBinaryFloatDouble().booleanValue()) {
                            int readByte2 = bytes.readByte();
                            if (readByte2 != -1) {
                                byte[] bArr3 = new byte[readByte2];
                                bytes.read(bArr3);
                                r13 = Float.valueOf(new BINARY_FLOAT(bArr3).floatValue());
                                break;
                            }
                        } else {
                            NUMBER readNUMBER6 = readNUMBER(bytes);
                            r13 = readNUMBER6 != null ? Float.valueOf(readNUMBER6.floatValue()) : null;
                            break;
                        }
                        break;
                    case 8:
                        if (oraColumn.isBinaryFloatDouble().booleanValue()) {
                            int readByte3 = bytes.readByte();
                            if (readByte3 != -1) {
                                byte[] bArr4 = new byte[readByte3];
                                bytes.read(bArr4);
                                r13 = Double.valueOf(new BINARY_DOUBLE(bArr4).doubleValue());
                                break;
                            }
                        } else {
                            NUMBER readNUMBER7 = readNUMBER(bytes);
                            r13 = readNUMBER7 != null ? Double.valueOf(readNUMBER7.doubleValue()) : null;
                            break;
                        }
                        break;
                    case 91:
                    case 93:
                        int readByte4 = bytes.readByte();
                        if (readByte4 != -1) {
                            byte[] bArr5 = new byte[readByte4];
                            bytes.read(bArr5);
                            r13 = new TIMESTAMP(bArr5).timestampValue();
                            break;
                        }
                        break;
                    case 2004:
                    case 2005:
                    case 2009:
                    case 2011:
                        int readInt = bytes.readInt();
                        if (readInt != -1) {
                            byte[] bArr6 = new byte[readInt];
                            bytes.read(bArr6);
                            r13 = bArr6;
                            break;
                        } else {
                            r13 = new byte[0];
                            break;
                        }
                    case 2014:
                        int readByte5 = bytes.readByte();
                        if (readByte5 != -1) {
                            byte[] bArr7 = new byte[readByte5];
                            bytes.read(bArr7);
                            r13 = oraColumn.isLocalTimeZone().booleanValue() ? OraTimestamp.ISO_8601_FMT.format(new TIMESTAMPLTZ(bArr7).offsetDateTimeValue(this.connTzData)) : OraTimestamp.ISO_8601_FMT.format(new TIMESTAMPTZ(bArr7).offsetDateTimeValue(this.connTzData));
                            break;
                        }
                        break;
                    default:
                        throw new SQLException("Unsupported JDBC Type " + oraColumn.getJdbcType() + " for column " + columnName);
                }
                if (this.keyStruct != null && this.pkColumns.containsKey(columnName)) {
                    try {
                        this.keyStruct.put(columnName, r13);
                    } catch (DataException e) {
                        LOGGER.error("Data exception while performing initial load for table {}, COLUMN={}, VALUE={}", new Object[]{this.tableFqn, columnName, r13});
                        LOGGER.error("Primary key column(s) for table {}:", this.tableFqn);
                        this.pkColumns.forEach((str, oraColumn2) -> {
                            LOGGER.error("\t" + oraColumn2.getColumnName());
                        });
                        LOGGER.error("Key schema elements for table {}:", this.tableFqn);
                        this.keySchema.fields().forEach(field -> {
                            LOGGER.error("\t" + field.name());
                        });
                        throw new DataException(e);
                    }
                }
                if ((this.schemaType == 2 && !this.pkColumns.containsKey(columnName)) || this.schemaType == 3 || this.schemaType == 1) {
                    this.valueStruct.put(columnName, r13);
                }
            } catch (SQLException e2) {
                LOGGER.error(ExceptionUtils.getExceptionStackTrace(e2));
                throw new ConnectException(e2);
            }
        }
    }

    private NUMBER readNUMBER(Bytes<?> bytes) {
        int readByte = bytes.readByte();
        if (readByte == -1) {
            return null;
        }
        byte[] bArr = new byte[readByte];
        bytes.read(bArr);
        return new NUMBER(bArr);
    }

    public boolean usesSelfDescribingMessage() {
        return super.usesSelfDescribingMessage();
    }

    public void readTableData(Long l, CountDownLatch countDownLatch, AtomicBoolean atomicBoolean, BlockingQueue<OraTable4InitialLoad> blockingQueue, OraConnectionObjects oraConnectionObjects) {
        this.metrics.startSelectTable(this.tableFqn);
        boolean z = false;
        try {
            Connection connection = oraConnectionObjects.getConnection();
            try {
                this.connTzData = connection;
                if (this.pdbName != null) {
                    Statement createStatement = connection.createStatement();
                    createStatement.execute("alter session set CONTAINER=" + this.pdbName);
                    createStatement.close();
                }
                PreparedStatement prepareStatement = connection.prepareStatement(this.sqlSelect, 1003, 1007);
                if (isRowLevelScn()) {
                    prepareStatement.setLong(1, l.longValue());
                    LOGGER.info("Table {} initial load (read phase) up to SCN {} started.", this.tableFqn, l);
                } else {
                    LOGGER.info("Table {} (DEPENDENCY='DISABLED') initial load (read phase) started.", this.tableFqn);
                }
                long nanoTime = System.nanoTime();
                this.rsMaster = prepareStatement.executeQuery();
                while (this.rsMaster.next() && atomicBoolean.get()) {
                    this.appender.writeDocument(this);
                    this.queueSize++;
                }
                if (atomicBoolean.get()) {
                    this.rsMaster.close();
                    this.rsMaster = null;
                    prepareStatement.close();
                    this.metrics.finishSelectTable(this.tableFqn, this.queueSize, this.queueSize * this.allColumns.size(), System.nanoTime() - nanoTime);
                    z = true;
                    LOGGER.info("Table {} initial load (read phase) completed. {} rows read.", this.tableFqn, Integer.valueOf(this.queueSize));
                    if (this.pdbName != null) {
                        Statement createStatement2 = connection.createStatement();
                        createStatement2.execute("alter session set CONTAINER=" + this.rdbmsInfo.getPdbName());
                        createStatement2.close();
                    }
                }
                if (connection != null) {
                    connection.close();
                }
            } finally {
            }
        } catch (SQLException e) {
            if (e.getErrorCode() == 942) {
                LOGGER.error("ORA-942!\nPlease grant select on table {} for user running connector!", this.tableFqn);
            } else {
                if (atomicBoolean.get()) {
                    LOGGER.error("Error while performing initial load of {}!", this.tableFqn);
                    LOGGER.error(ExceptionUtils.getExceptionStackTrace(e));
                    throw new ConnectException(e);
                }
                z = false;
            }
        }
        if (z) {
            try {
                blockingQueue.put(this);
            } catch (InterruptedException e2) {
                LOGGER.error(ExceptionUtils.getExceptionStackTrace(e2));
                throw new ConnectException(e2);
            }
        } else {
            LOGGER.warn("Incomplete initial load data for {} are removed.", this.tableFqn);
            close();
        }
        countDownLatch.countDown();
    }

    public SourceRecord getSourceRecord() {
        long nanoTime = System.nanoTime();
        if (this.schemaType != 3) {
            this.keyStruct = new Struct(this.keySchema);
        }
        this.valueStruct = new Struct(this.valueSchema);
        boolean readDocument = this.tailer.readDocument(this);
        this.tailerOffset++;
        if (!readDocument) {
            return null;
        }
        HashMap hashMap = new HashMap();
        hashMap.put("ROWNUM", Integer.valueOf(this.tailerOffset));
        SourceRecord sourceRecord = null;
        if (this.schemaType == 1) {
            long currentTimeMillis = System.currentTimeMillis();
            Struct struct = new Struct(this.schema);
            struct.put("source", this.rdbmsInfo.getStruct(this.tableFqn, this.pdbName, this.tableOwner, this.tableName, 0L, currentTimeMillis, "", 0L, ""));
            struct.put("before", this.keyStruct);
            struct.put("after", this.valueStruct);
            struct.put("op", "c");
            struct.put("ts_ms", Long.valueOf(currentTimeMillis));
            sourceRecord = new SourceRecord(this.sourcePartition, hashMap, this.kafkaTopic, this.schema, struct);
        } else {
            if (this.schemaType == 2) {
                sourceRecord = new SourceRecord(this.sourcePartition, hashMap, this.kafkaTopic, this.keySchema, this.keyStruct, this.valueSchema, this.valueStruct);
            } else if (this.schemaType == 3) {
                sourceRecord = new SourceRecord(this.sourcePartition, hashMap, this.kafkaTopic, this.valueSchema, this.valueStruct);
            }
            sourceRecord.headers().addString("op", "c");
        }
        this.metrics.addSendInfo(this.allColumns.size(), System.nanoTime() - nanoTime);
        return sourceRecord;
    }

    public String fqn() {
        return this.tableFqn;
    }

    public void close() {
        LOGGER.trace("Closing Cronicle Queue and deleting files.");
        if (this.tableRows != null) {
            this.tableRows.close();
        }
        this.tableRows = null;
        try {
            Files.walk(this.queueDirectory, new FileVisitOption[0]).sorted(Comparator.reverseOrder()).map((v0) -> {
                return v0.toFile();
            }).forEach((v0) -> {
                v0.delete();
            });
        } catch (IOException e) {
            LOGGER.error("Unable to delete Cronicle Queue files.");
            LOGGER.error(ExceptionUtils.getExceptionStackTrace(e));
        }
    }

    public int length() {
        return this.queueSize;
    }

    public int offset() {
        return this.tailerOffset;
    }

    public Path getPath() {
        return this.queueDirectory;
    }
}
