package org.apache.pulsar.io.azuredataexplorer;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.microsoft.azure.kusto.data.StringUtils;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.exceptions.KustoDataExceptionBase;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionMapping;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.result.IngestionResult;
import com.microsoft.azure.kusto.ingest.result.IngestionStatus;
import com.microsoft.azure.kusto.ingest.result.TableReportIngestionResult;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.ByteArrayInputStream;
import java.net.ConnectException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name = "adx", type = IOType.SINK, help = "The ADXSink is used for moving messages from Pulsar to ADX.", configClass = ADXSinkConfig.class)
/* loaded from: input_file:org/apache/pulsar/io/azuredataexplorer/ADXSink.class */
public class ADXSink implements Sink<byte[]> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ADXSink.class);
    private final ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule());
    IngestionProperties ingestionProperties;
    private IngestClient ingestClient;
    private List<Record<byte[]>> incomingRecordsList;
    private int batchSize;
    private long batchTimeMs;
    private ScheduledExecutorService adxSinkExecutor;
    private int maxRetryAttempts;
    private long retryBackOffTime;

    @Override // org.apache.pulsar.io.core.Sink
    public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
        log.info("Open ADX Sink");
        ADXSinkConfig load = ADXSinkConfig.load(map, sinkContext);
        load.validate();
        ConnectionStringBuilder connectionStringBuilder = getConnectionStringBuilder(load);
        if (connectionStringBuilder == null) {
            throw new Exception("Kusto Connection String NULL");
        }
        log.debug("ConnectionString created: {}.", connectionStringBuilder);
        this.ingestClient = load.getManagedIdentityId() != null ? IngestClientFactory.createManagedStreamingIngestClient(connectionStringBuilder) : IngestClientFactory.createClient(connectionStringBuilder);
        this.ingestionProperties = new IngestionProperties(load.getDatabase(), load.getTable());
        this.ingestionProperties.setIngestionMapping(load.getMappingRefName(), getParseMappingRefType(load.getMappingRefType()));
        this.ingestionProperties.setReportLevel(IngestionProperties.IngestionReportLevel.FAILURES_AND_SUCCESSES);
        this.ingestionProperties.setReportMethod(IngestionProperties.IngestionReportMethod.TABLE);
        this.ingestionProperties.setFlushImmediately(load.isFlushImmediately());
        this.ingestionProperties.setDataFormat(IngestionProperties.DataFormat.MULTIJSON);
        log.debug("Ingestion Properties:  {}", this.ingestionProperties.toString());
        this.maxRetryAttempts = load.getMaxRetryAttempts() + 1;
        this.retryBackOffTime = load.getRetryBackOffTime();
        this.batchSize = load.getBatchSize();
        this.batchTimeMs = load.getBatchTimeMs();
        this.incomingRecordsList = new ArrayList();
        this.adxSinkExecutor = Executors.newScheduledThreadPool(1);
        this.adxSinkExecutor.scheduleAtFixedRate(this::sinkData, this.batchTimeMs, this.batchTimeMs, TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void write(Record<byte[]> record) {
        int size;
        synchronized (this) {
            this.incomingRecordsList.add(record);
            size = this.incomingRecordsList.size();
        }
        if (size == this.batchSize) {
            this.adxSinkExecutor.execute(this::sinkData);
        }
    }

    private void sinkData() {
        IngestionResult ingestFromStream;
        synchronized (this) {
            if (this.incomingRecordsList.isEmpty()) {
                return;
            }
            List<Record<byte[]>> list = this.incomingRecordsList;
            this.incomingRecordsList = new ArrayList();
            LinkedList linkedList = new LinkedList();
            for (Record<byte[]> record : list) {
                try {
                    linkedList.add(getADXPulsarEvent(record));
                } catch (Exception e) {
                    record.fail();
                    log.error("Failed to collect the record for ADX cluster.", (Throwable) e);
                }
            }
            int i = 0;
            while (true) {
                try {
                    try {
                        try {
                            ingestFromStream = this.ingestClient.ingestFromStream(new StreamSourceInfo(new ByteArrayInputStream(this.mapper.writeValueAsBytes(linkedList))), this.ingestionProperties);
                        } catch (IngestionServiceException e2) {
                            Throwable cause = e2.getCause();
                            if ((cause instanceof KustoDataExceptionBase) && ((KustoDataExceptionBase) cause).isPermanent()) {
                                list.forEach((v0) -> {
                                    v0.fail();
                                });
                                throw new ConnectException(e2.getMessage());
                            }
                            backOffForRemainingAttempts(i, e2, list);
                        }
                        if (!(ingestFromStream instanceof TableReportIngestionResult)) {
                            return;
                        }
                        if (hasStreamingSucceeded(ingestFromStream.getIngestionStatusCollection().get(0))) {
                            list.forEach((v0) -> {
                                v0.ack();
                            });
                            return;
                        } else {
                            i += 3;
                            backOffForRemainingAttempts(i, null, list);
                            i++;
                        }
                    } catch (IngestionClientException | URISyntaxException e3) {
                        list.forEach((v0) -> {
                            v0.fail();
                        });
                        throw new ConnectException(e3.getMessage());
                    }
                } catch (Exception e4) {
                    log.error("Failed to publish the message to ADX cluster", (Throwable) e4);
                    return;
                }
            }
        }
    }

    private boolean hasStreamingSucceeded(@NotNull IngestionStatus ingestionStatus) {
        switch (ingestionStatus.status) {
            case Succeeded:
            case Queued:
            case Pending:
                return true;
            case Skipped:
            case PartiallySucceeded:
                String failureStatus = ingestionStatus.getFailureStatus();
                String details = ingestionStatus.getDetails();
                UUID ingestionSourceId = ingestionStatus.getIngestionSourceId();
                Logger logger = log;
                Object[] objArr = new Object[7];
                objArr[0] = ingestionStatus.getStatus();
                objArr[1] = ingestionStatus.getTable();
                objArr[2] = ingestionStatus.getDatabase();
                objArr[3] = ingestionStatus.getOperationId();
                objArr[4] = ingestionSourceId;
                objArr[5] = StringUtils.isNotEmpty(failureStatus) ? ", failure: " + failureStatus : "";
                objArr[6] = StringUtils.isNotEmpty(details) ? ", details: " + details : "";
                logger.warn("A batch of streaming records has {} ingestion: table:{}, database:{}, operationId: {},ingestionSourceId: {}{}{}.\nStatus is final and therefore ingestion won't be retried and data won't reach dlq", objArr);
                return true;
            case Failed:
            default:
                return false;
        }
    }

    private void backOffForRemainingAttempts(int i, Exception exc, List<Record<byte[]>> list) throws PulsarClientException.ConnectException {
        if (i >= this.maxRetryAttempts) {
            list.forEach((v0) -> {
                v0.fail();
            });
            throw new PulsarClientException.ConnectException(String.format("Retry attempts exhausted, failed to ingest records into KustoDB. Exception: %s", exc.getMessage()));
        }
        long j = this.retryBackOffTime;
        log.error("Failed to ingest records into Kusto, backing off and retrying ingesting records after {} milliseconds.", Long.valueOf(j));
        try {
            TimeUnit.MILLISECONDS.sleep(j);
            throw new InterruptedException();
        } catch (InterruptedException e) {
            list.forEach((v0) -> {
                v0.fail();
            });
            throw new PulsarClientException.ConnectException(String.format("Retrying ingesting records into KustoDB was interrupted after retryAttempts=%s", Integer.valueOf(i + 1)));
        }
    }

    @NotNull
    private ADXPulsarEvent getADXPulsarEvent(@NotNull Record<byte[]> record) throws Exception {
        ADXPulsarEvent aDXPulsarEvent = new ADXPulsarEvent();
        record.getEventTime().ifPresent(l -> {
            aDXPulsarEvent.setEventTime(Instant.ofEpochMilli(l.longValue()));
        });
        Optional<String> key = record.getKey();
        Objects.requireNonNull(aDXPulsarEvent);
        key.ifPresent(aDXPulsarEvent::setKey);
        record.getMessage().ifPresent(message -> {
            aDXPulsarEvent.setProducerName(message.getProducerName());
        });
        record.getMessage().ifPresent(message2 -> {
            aDXPulsarEvent.setSequenceId(message2.getSequenceId());
        });
        aDXPulsarEvent.setValue(new String(record.getValue(), StandardCharsets.UTF_8));
        aDXPulsarEvent.setProperties(new ObjectMapper().writeValueAsString(record.getProperties()));
        return aDXPulsarEvent;
    }

    private IngestionMapping.IngestionMappingKind getParseMappingRefType(String str) {
        if (str == null || str.isEmpty()) {
            return null;
        }
        boolean z = -1;
        switch (str.hashCode()) {
            case -75029036:
                if (str.equals("PARQUET")) {
                    z = 3;
                    break;
                }
                break;
            case 67046:
                if (str.equals("CSV")) {
                    z = false;
                    break;
                }
                break;
            case 2021682:
                if (str.equals("AVRO")) {
                    z = true;
                    break;
                }
                break;
            case 2286824:
                if (str.equals("JSON")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return IngestionMapping.IngestionMappingKind.CSV;
            case true:
                return IngestionMapping.IngestionMappingKind.AVRO;
            case true:
                return IngestionMapping.IngestionMappingKind.JSON;
            case true:
                return IngestionMapping.IngestionMappingKind.PARQUET;
            default:
                return null;
        }
    }

    private ConnectionStringBuilder getConnectionStringBuilder(@NotNull ADXSinkConfig aDXSinkConfig) {
        if (aDXSinkConfig.getManagedIdentityId() != null) {
            if ("system".equalsIgnoreCase(aDXSinkConfig.getManagedIdentityId())) {
                return ConnectionStringBuilder.createWithAadManagedIdentity(aDXSinkConfig.getClusterUrl());
            }
            ConnectionStringBuilder.createWithAadManagedIdentity(aDXSinkConfig.getClusterUrl(), aDXSinkConfig.getManagedIdentityId());
        }
        return ConnectionStringBuilder.createWithAadApplicationCredentials(aDXSinkConfig.getClusterUrl(), aDXSinkConfig.getAppId(), aDXSinkConfig.getAppKey(), aDXSinkConfig.getTenantId());
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.ingestClient.close();
        this.adxSinkExecutor.shutdown();
        try {
            if (!this.adxSinkExecutor.awaitTermination(2 * this.batchTimeMs, TimeUnit.MILLISECONDS)) {
                this.adxSinkExecutor.shutdownNow();
            }
        } catch (InterruptedException e) {
            this.adxSinkExecutor.shutdownNow();
            Thread.currentThread().interrupt();
        }
        log.info("Kusto ingest client closed.");
    }
}
