package com.arangodb.kafka;

import com.arangodb.ArangoDB;
import com.arangodb.Request;
import com.arangodb.config.HostDescription;
import com.arangodb.kafka.config.ArangoSinkConfig;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/arangodb/kafka/HostListMonitor.class */
public class HostListMonitor {
    private static final Logger LOG = LoggerFactory.getLogger(HostListMonitor.class);
    private final ArangoDB adb;
    private final ScheduledExecutorService es = Executors.newSingleThreadScheduledExecutor();
    private final ConnectorContext context;
    private final int acquireHostIntervalMs;
    private volatile Set<HostDescription> endpoints;

    public HostListMonitor(ArangoSinkConfig arangoSinkConfig, ConnectorContext connectorContext) {
        this.acquireHostIntervalMs = arangoSinkConfig.getAcquireHostIntervalMs();
        this.endpoints = arangoSinkConfig.getEndpoints();
        this.adb = arangoSinkConfig.createMonitorClient();
        this.context = connectorContext;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        LOG.info("starting host list monitor background task");
        updateHostList();
        this.es.scheduleAtFixedRate(this::monitorHosts, this.acquireHostIntervalMs, this.acquireHostIntervalMs, TimeUnit.MILLISECONDS);
    }

    public Set<HostDescription> getEndpoints() {
        return this.endpoints;
    }

    public void stop() {
        LOG.info("stopping host list monitor background task");
        this.adb.shutdown();
        this.es.shutdown();
        try {
            if (!this.es.awaitTermination(10000L, TimeUnit.MILLISECONDS)) {
                this.es.shutdownNow();
            }
        } catch (InterruptedException e) {
            this.es.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    private Set<HostDescription> acquireHostList() {
        LOG.trace("acquiring host list");
        try {
            return parseAcquireHostListResponse((ObjectNode) this.adb.execute(Request.builder().method(Request.Method.GET).path("/_api/cluster/endpoints").build(), ObjectNode.class).getBody());
        } catch (Exception e) {
            LOG.warn("Got exception while acquiring the host list: ", e);
            return Collections.emptySet();
        }
    }

    private Set<HostDescription> parseAcquireHostListResponse(ObjectNode objectNode) {
        HashSet hashSet = new HashSet();
        Iterator it = objectNode.get("endpoints").iterator();
        while (it.hasNext()) {
            hashSet.add(HostDescription.parse(((JsonNode) it.next()).get("endpoint").textValue().replaceFirst(".*://", "")));
        }
        return hashSet;
    }

    private boolean updateHostList() {
        LOG.debug("Fetching host list.");
        Set<HostDescription> acquireHostList = acquireHostList();
        if (acquireHostList.isEmpty() || this.endpoints.equals(acquireHostList)) {
            return false;
        }
        LOG.info("Detected change in the acquired host list: \n\t old: {} \n\t new: {}", this.endpoints, acquireHostList);
        this.endpoints = acquireHostList;
        return true;
    }

    private void monitorHosts() {
        if (updateHostList()) {
            LOG.info("Requesting tasks reconfiguration.");
            this.context.requestTaskReconfiguration();
        }
    }
}
