package com.arangodb.kafka;

import com.arangodb.ArangoCollection;
import com.arangodb.ArangoDBException;
import com.arangodb.internal.DocumentFields;
import com.arangodb.kafka.config.ArangoSinkConfig;
import com.arangodb.kafka.conversion.KeyConverter;
import com.arangodb.kafka.conversion.RecordConverter;
import com.arangodb.model.DocumentCreateOptions;
import com.arangodb.model.DocumentDeleteOptions;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/arangodb/kafka/ArangoWriter.class */
public class ArangoWriter {
    private static final Logger LOG = LoggerFactory.getLogger(ArangoWriter.class);
    private static final Set<Integer> DATA_ERROR_NUMS = new HashSet(Arrays.asList(600, 1208, 1210, 1216, 1221, 1222, 1226, 1233, 1466, 1469, 1504, 1505, 1524, 1542, 1543, 1561, 1562, 1563, 1569, 1572, 1578, 1593, 1594, 1620, 4001, 4003, 4010));
    private final ArangoCollection col;
    private final ErrantRecordReporter reporter;
    private final SinkTaskContext context;
    private final KeyConverter keyConverter;
    private final RecordConverter converter;
    private final DocumentCreateOptions createOptions;
    private final DocumentDeleteOptions deleteOptions;
    private final boolean deleteEnabled;
    private final int maxRetries;
    private final int retryBackoffMs;
    private final boolean tolerateDataErrors;
    private final boolean logDataErrors;
    private int remainingRetries;
    private final Set<Integer> extraDataErrorsNums;

    public ArangoWriter(ArangoSinkConfig arangoSinkConfig, ArangoCollection arangoCollection, SinkTaskContext sinkTaskContext) {
        this.createOptions = arangoSinkConfig.getCreateOptions();
        this.deleteOptions = arangoSinkConfig.getDeleteOptions();
        this.deleteEnabled = arangoSinkConfig.isDeleteEnabled();
        this.retryBackoffMs = arangoSinkConfig.getRetryBackoffMs();
        this.maxRetries = arangoSinkConfig.getMaxRetries();
        this.remainingRetries = this.maxRetries;
        this.tolerateDataErrors = arangoSinkConfig.getTolerateDataErrors();
        this.logDataErrors = arangoSinkConfig.getLogDataErrors();
        this.extraDataErrorsNums = arangoSinkConfig.getExtraDataErrorsNums();
        this.col = arangoCollection;
        this.context = sinkTaskContext;
        this.reporter = sinkTaskContext.errantRecordReporter();
        if (this.reporter == null) {
            LOG.info("Errant record reporter not configured.");
        }
        this.keyConverter = new KeyConverter();
        this.converter = new RecordConverter(this.keyConverter);
    }

    public void put(Collection<SinkRecord> collection) {
        if (collection.isEmpty()) {
            return;
        }
        LOG.trace("Handling {} record(s)", Integer.valueOf(collection.size()));
        for (SinkRecord sinkRecord : collection) {
            try {
                handleRecord(sinkRecord);
            } catch (DataException e) {
                handleDataException(sinkRecord, e);
            } catch (TransientException e2) {
                handleTransientException(sinkRecord, e2);
            }
        }
    }

    private void handleRecord(SinkRecord sinkRecord) {
        try {
            LOG.trace("Handling record: {}-{}-{}", new Object[]{sinkRecord.topic(), sinkRecord.kafkaPartition(), Long.valueOf(sinkRecord.kafkaOffset())});
            if (sinkRecord.key() == null || sinkRecord.value() != null) {
                handleInsert(sinkRecord);
            } else {
                handleDelete(sinkRecord);
            }
            LOG.trace("Completed handling record");
            this.remainingRetries = this.maxRetries;
        } catch (Exception e) {
            throw wrapException(e);
        }
    }

    private void handleDelete(SinkRecord sinkRecord) {
        if (!this.deleteEnabled) {
            throw new ConnectException("Deletes are not enabled.");
        }
        String convert = this.keyConverter.convert(sinkRecord);
        try {
            LOG.trace("Deleting document: {}", convert);
            this.col.deleteDocument(convert, this.deleteOptions);
        } catch (ArangoDBException e) {
            if (e.getResponseCode().intValue() != 404 || e.getErrorNum().intValue() != 1202) {
                throw e;
            }
            LOG.debug("Deleting document not found: {}", convert);
        }
    }

    private void handleInsert(SinkRecord sinkRecord) {
        ObjectNode convert = this.converter.convert(sinkRecord);
        LOG.trace("Inserting document: {}", convert.get(DocumentFields.KEY));
        this.col.insertDocument(convert, this.createOptions);
    }

    private ConnectException wrapException(Exception exc) {
        if (exc instanceof DataException) {
            return (DataException) exc;
        }
        if (exc instanceof ArangoDBException) {
            Integer errorNum = ((ArangoDBException) exc).getErrorNum();
            if (DATA_ERROR_NUMS.contains(errorNum) || this.extraDataErrorsNums.contains(errorNum)) {
                return new DataException(exc);
            }
        }
        return new TransientException(exc);
    }

    private void handleDataException(SinkRecord sinkRecord, DataException dataException) {
        if (this.logDataErrors) {
            LOG.warn("Got data exception while processing record: {}", sinkRecord, dataException);
        }
        if (!this.tolerateDataErrors) {
            throw dataException;
        }
        if (this.reporter == null) {
            LOG.debug("Ignoring exception:", dataException);
        } else {
            LOG.debug("Reporting exception to DLQ:", dataException);
            this.reporter.report(sinkRecord, dataException);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void handleTransientException(SinkRecord sinkRecord, TransientException transientException) {
        LOG.debug("Got transient exception while processing record: {}", sinkRecord, transientException);
        if (this.remainingRetries <= 0) {
            this.remainingRetries = this.maxRetries;
            throw transientException;
        }
        LOG.debug("remaining retries: {}", Integer.valueOf(this.remainingRetries));
        this.remainingRetries--;
        this.context.timeout(this.retryBackoffMs);
        throw new RetriableException(transientException);
    }
}
