package com.arangodb.kafka;

import com.arangodb.ArangoCollection;
import com.arangodb.ArangoDBException;
import com.arangodb.entity.ErrorEntity;
import com.arangodb.internal.DocumentFields;
import com.arangodb.kafka.config.ArangoSinkConfig;
import com.arangodb.kafka.conversion.KeyConverter;
import com.arangodb.kafka.conversion.RecordConverter;
import com.arangodb.model.DocumentCreateOptions;
import com.arangodb.model.DocumentDeleteOptions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/arangodb/kafka/ArangoWriter.class */
public class ArangoWriter {
    private static final Logger LOG = LoggerFactory.getLogger(ArangoWriter.class);
    private static final Set<Integer> DATA_ERROR_NUMS = new HashSet(Arrays.asList(1208, 1210, 1216, 1221, 1222, 1226, 1233, 1466, 1469, 1504, 1505, 1524, 1542, 1543, 1561, 1562, 1563, 1569, 1572, 1578, 1593, 1594, 1620, 4001, 4003, 4010));
    private final ArangoCollection col;
    private final ErrantRecordReporter reporter;
    private final SinkTaskContext context;
    private final KeyConverter keyConverter;
    private final RecordConverter converter;
    private final DocumentCreateOptions createOptions;
    private final DocumentDeleteOptions deleteOptions;
    private final int batchSize;
    private final boolean deleteEnabled;
    private final int maxRetries;
    private final int retryBackoffMs;
    private final boolean tolerateDataErrors;
    private final boolean logDataErrors;
    private final Set<Integer> extraDataErrorsNums;
    private int remainingRetries;
    private int currentOffset = 0;
    private SinkRecord errorRecord = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/arangodb/kafka/ArangoWriter$Type.class */
    public enum Type {
        INSERT,
        DELETE
    }

    public ArangoWriter(ArangoSinkConfig arangoSinkConfig, ArangoCollection arangoCollection, SinkTaskContext sinkTaskContext) {
        this.createOptions = arangoSinkConfig.getCreateOptions();
        this.deleteOptions = arangoSinkConfig.getDeleteOptions();
        this.batchSize = arangoSinkConfig.getBatchSize();
        this.deleteEnabled = arangoSinkConfig.isDeleteEnabled();
        this.maxRetries = arangoSinkConfig.getMaxRetries();
        this.retryBackoffMs = arangoSinkConfig.getRetryBackoffMs();
        this.tolerateDataErrors = arangoSinkConfig.getTolerateDataErrors();
        this.logDataErrors = arangoSinkConfig.getLogDataErrors();
        this.extraDataErrorsNums = arangoSinkConfig.getExtraDataErrorsNums();
        this.remainingRetries = this.maxRetries;
        this.col = arangoCollection;
        this.context = sinkTaskContext;
        this.reporter = sinkTaskContext.errantRecordReporter();
        if (this.reporter == null) {
            LOG.info("Errant record reporter not configured.");
        }
        this.keyConverter = new KeyConverter();
        this.converter = new RecordConverter(this.keyConverter);
    }

    public void put(Collection<SinkRecord> collection) {
        if (collection.isEmpty()) {
            return;
        }
        LOG.trace("Handling {} record(s)", Integer.valueOf(collection.size()));
        ArrayList arrayList = new ArrayList(collection);
        while (this.currentOffset < collection.size()) {
            this.errorRecord = null;
            List<SinkRecord> extractBatch = extractBatch(arrayList);
            if (extractBatch.isEmpty()) {
                break;
            }
            LOG.trace("Handling batch of {} record(s)", Integer.valueOf(extractBatch.size()));
            try {
                handleBatch(extractBatch);
            } catch (TransientException e) {
                handleTransientException(e);
            } catch (DataException e2) {
                handleDataException(e2);
            }
            this.remainingRetries = this.maxRetries;
            this.currentOffset += extractBatch.size();
        }
        this.remainingRetries = this.maxRetries;
        this.currentOffset = 0;
    }

    private List<SinkRecord> extractBatch(List<SinkRecord> list) {
        List<SinkRecord> subList = list.subList(this.currentOffset, list.size());
        if (subList.isEmpty()) {
            return subList;
        }
        Type type = getType(subList.get(0));
        return subList.subList(0, Math.min(IntStream.range(0, subList.size()).filter(i -> {
            return !type.equals(getType((SinkRecord) subList.get(i)));
        }).findFirst().orElse(subList.size()), this.batchSize));
    }

    private Type getType(SinkRecord sinkRecord) {
        return (sinkRecord.key() == null || sinkRecord.value() != null) ? Type.INSERT : Type.DELETE;
    }

    private void handleBatch(List<SinkRecord> list) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Handling batch of {} records: {}", Integer.valueOf(list.size()), (String) list.stream().map(sinkRecord -> {
                return String.format("%s-%s-%s", sinkRecord.topic(), sinkRecord.kafkaPartition(), Long.valueOf(sinkRecord.kafkaOffset()));
            }).collect(Collectors.joining("\n\t", "\n\t", "\n")));
        }
        try {
            switch (getType(list.get(0))) {
                case DELETE:
                    handleBatchDelete(list);
                    break;
                case INSERT:
                    handleBatchInsert(list);
                    break;
                default:
                    throw new IllegalStateException();
            }
            LOG.trace("Completed handling batch");
        } catch (Exception e) {
            throw wrapException(e);
        }
    }

    private void handleBatchDelete(List<SinkRecord> list) {
        if (!this.deleteEnabled) {
            throw new ConnectException("Deletes are not enabled.");
        }
        Stream<SinkRecord> stream = list.stream();
        KeyConverter keyConverter = this.keyConverter;
        keyConverter.getClass();
        List list2 = (List) stream.map(keyConverter::convert).collect(Collectors.toList());
        LOG.trace("Deleting documents: {}", list2);
        List<Object> documentsAndErrors = this.col.deleteDocuments(list2, this.deleteOptions).getDocumentsAndErrors();
        checkResultSize(list, documentsAndErrors);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < documentsAndErrors.size(); i++) {
            Object obj = documentsAndErrors.get(i);
            if (!(obj instanceof ErrorEntity)) {
                arrayList.add(list.get(i));
                arrayList2.add(documentsAndErrors.get(i));
            } else if (((ErrorEntity) obj).getErrorNum() == 1202) {
                LOG.debug("Deleting document not found: {}", list2.get(i));
            } else {
                arrayList.add(list.get(i));
                arrayList2.add(documentsAndErrors.get(i));
            }
        }
        checkTransientErrors(arrayList, arrayList2);
        checkDataErrors(arrayList, arrayList2);
    }

    private void handleBatchInsert(List<SinkRecord> list) {
        Stream<SinkRecord> stream = list.stream();
        RecordConverter recordConverter = this.converter;
        recordConverter.getClass();
        List list2 = (List) stream.map(recordConverter::convert).collect(Collectors.toList());
        if (LOG.isTraceEnabled()) {
            LOG.trace("Inserting documents: {}", (List) list2.stream().map(objectNode -> {
                return objectNode.get(DocumentFields.KEY);
            }).collect(Collectors.toList()));
        }
        List<Object> documentsAndErrors = this.col.insertDocuments(list2, this.createOptions).getDocumentsAndErrors();
        checkResultSize(list, documentsAndErrors);
        checkTransientErrors(list, documentsAndErrors);
        checkDataErrors(list, documentsAndErrors);
    }

    private void checkTransientErrors(List<SinkRecord> list, List<Object> list2) {
        for (int i = 0; i < list2.size(); i++) {
            Object obj = list2.get(i);
            if (obj instanceof ErrorEntity) {
                ErrorEntity errorEntity = (ErrorEntity) obj;
                if (!isDataError(Integer.valueOf(errorEntity.getErrorNum()))) {
                    this.errorRecord = list.get(i);
                    throw new TransientException(new ArangoDBException(errorEntity));
                }
            }
        }
    }

    private void checkDataErrors(List<SinkRecord> list, List<Object> list2) {
        for (int i = 0; i < list2.size(); i++) {
            Object obj = list2.get(i);
            if (obj instanceof ErrorEntity) {
                ErrorEntity errorEntity = (ErrorEntity) obj;
                if (isDataError(Integer.valueOf(errorEntity.getErrorNum()))) {
                    this.errorRecord = list.get(i);
                    handleDataException(new DataException(new ArangoDBException(errorEntity)));
                }
            }
        }
    }

    private void checkResultSize(List<SinkRecord> list, List<Object> list2) {
        if (list.size() != list2.size()) {
            throw new ConnectException("Response length [" + list2.size() + "] does not match batch length [" + list.size() + "].");
        }
    }

    private ConnectException wrapException(Exception exc) {
        return exc instanceof ConnectException ? (ConnectException) exc : ((exc instanceof ArangoDBException) && isDataError(((ArangoDBException) exc).getErrorNum())) ? new DataException(exc) : new TransientException(exc);
    }

    private boolean isDataError(Integer num) {
        return DATA_ERROR_NUMS.contains(num) || this.extraDataErrorsNums.contains(num);
    }

    private void handleDataException(DataException dataException) {
        if (this.errorRecord == null) {
            throw new ConnectException("Got data exception in batch write!");
        }
        if (this.logDataErrors) {
            LOG.warn("Got data exception while processing record: {}", this.errorRecord, dataException);
        } else {
            LOG.debug("Got data exception while processing record: {}", this.errorRecord, dataException);
        }
        if (!this.tolerateDataErrors) {
            throw dataException;
        }
        if (this.reporter == null) {
            LOG.debug("Ignoring exception:", dataException);
        } else {
            LOG.debug("Reporting exception to DLQ:", dataException);
            this.reporter.report(this.errorRecord, dataException);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void handleTransientException(TransientException transientException) {
        LOG.warn("Got transient exception: ", transientException);
        if (this.errorRecord != null) {
            LOG.debug("Got transient exception while processing record: {}", this.errorRecord, transientException);
        }
        if (this.remainingRetries <= 0) {
            this.remainingRetries = this.maxRetries;
            throw transientException;
        }
        LOG.info("remaining retries: {}", Integer.valueOf(this.remainingRetries));
        this.remainingRetries--;
        this.context.timeout(this.retryBackoffMs);
        throw new RetriableException(transientException);
    }
}
