package io.mindmaps.engine.loader;

import io.mindmaps.engine.loader.TransactionState;
import io.mindmaps.engine.util.ConfigProperties;
import io.mindmaps.graql.Var;
import io.mindmaps.util.ErrorMessage;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import javax.xml.ws.http.HTTPException;
import mjson.Json;
import org.apache.commons.io.IOUtils;

/* loaded from: input_file:io/mindmaps/engine/loader/DistributedLoader.class */
public class DistributedLoader extends Loader {
    private Future future;
    private long pollingFrequency;
    private String graphName;
    private int currentHost;
    private String[] hostsArray;
    private Map<String, Semaphore> availability;
    private Map<String, Integer> jobsTerminated;
    private static ExecutorService executor = Executors.newSingleThreadExecutor();
    private static final String POST = "http://%s:" + ConfigProperties.getInstance().getProperty(ConfigProperties.SERVER_PORT_NUMBER) + "/transaction/new?graphName=%s";
    private static final String GET = "http://%s:" + ConfigProperties.getInstance().getProperty(ConfigProperties.SERVER_PORT_NUMBER) + "/transaction/loaderState";

    public DistributedLoader(String str, Collection<String> collection) {
        ConfigProperties configProperties = ConfigProperties.getInstance();
        this.batchSize = configProperties.getPropertyAsInt(ConfigProperties.BATCH_SIZE_PROPERTY);
        this.graphName = str;
        this.batch = new HashSet();
        this.hostsArray = (String[]) collection.toArray(new String[collection.size()]);
        this.currentHost = 0;
        this.pollingFrequency = configProperties.getPropertyAsLong(ConfigProperties.POLLING_FREQUENCY_PROPERTY);
        this.threadsNumber = configProperties.getAvailableThreads() * 3;
        this.availability = new HashMap();
        collection.forEach(str2 -> {
            this.availability.put(str2, new Semaphore(this.threadsNumber));
        });
        this.jobsTerminated = new HashMap();
        collection.forEach(str3 -> {
            this.jobsTerminated.put(str3, 0);
        });
    }

    @Override // io.mindmaps.engine.loader.Loader
    public void setThreadsNumber(int i) {
        this.threadsNumber = i;
        this.availability.keySet().forEach(str -> {
            this.availability.put(str, new Semaphore(this.threadsNumber));
        });
    }

    public void setPollingFrequency(long j) {
        this.pollingFrequency = j;
    }

    @Override // io.mindmaps.engine.loader.Loader
    public void waitToFinish() {
        if (this.future != null) {
            flush();
            try {
                this.future.get();
            } catch (InterruptedException | ExecutionException e) {
                this.LOG.error(e.getMessage());
            }
        }
        this.LOG.info("All tasks done!");
    }

    @Override // io.mindmaps.engine.loader.Loader
    public void submitBatch(Collection<Var> collection) {
        String str = (String) collection.stream().map(var -> {
            return var + ";";
        }).collect(Collectors.joining(" "));
        if (str.length() == 0) {
            return;
        }
        HttpURLConnection acquireNextHost = acquireNextHost();
        executePost(acquireNextHost, "insert " + str);
        int responseCode = getResponseCode(acquireNextHost);
        if (responseCode != 201) {
            throw new HTTPException(responseCode);
        }
        markAsLoading(getResponseBody(acquireNextHost));
        this.LOG.info("Transaction sent to host: " + this.hostsArray[this.currentHost]);
        if (this.future == null) {
            startCheckingStatus();
        }
        acquireNextHost.disconnect();
    }

    private String getHostState(String str) {
        HttpURLConnection host = getHost(str, GET);
        String responseBody = getResponseBody(host);
        host.disconnect();
        return responseBody;
    }

    private boolean transactionsIsEmpty() {
        return this.loadingJobs.get() + this.enqueuedJobs.get() == 0;
    }

    private void startCheckingStatus() {
        this.future = executor.submit(this::checkForStatusLoop);
    }

    private void stopCheckingStatus() {
        executor.shutdownNow();
        executor = Executors.newSingleThreadExecutor();
        this.future = null;
    }

    public void checkForStatusLoop() {
        while (!transactionsIsEmpty()) {
            int i = 0;
            int i2 = 0;
            int i3 = 0;
            int i4 = 0;
            for (String str : this.availability.keySet()) {
                Json read = Json.read(getHostState(str));
                this.LOG.info("State from host [" + str + "]:");
                this.LOG.info(read.toString());
                int asInteger = read.at(TransactionState.State.QUEUED.name()).asInteger();
                int asInteger2 = read.at(TransactionState.State.LOADING.name()).asInteger();
                int asInteger3 = read.at(TransactionState.State.ERROR.name()).asInteger();
                int asInteger4 = read.at(TransactionState.State.FINISHED.name()).asInteger();
                int i5 = asInteger4 + asInteger3;
                this.availability.get(str).release(i5 - this.jobsTerminated.get(str).intValue());
                this.jobsTerminated.put(str, Integer.valueOf(i5));
                i += asInteger;
                i2 += asInteger2;
                i3 += asInteger3;
                i4 += asInteger4;
            }
            this.enqueuedJobs.set(i);
            this.loadingJobs.set(i2);
            this.errorJobs.set(i3);
            this.finishedJobs.set(i4);
            printLoaderState();
            try {
                Thread.sleep(this.pollingFrequency);
            } catch (InterruptedException e) {
                this.LOG.error("Exception", e);
            }
        }
        stopCheckingStatus();
    }

    private HttpURLConnection acquireNextHost() {
        String nextHost = nextHost();
        while (true) {
            String str = nextHost;
            if (this.availability.get(str).tryAcquire()) {
                return getHost(str, POST);
            }
            nextHost = nextHost();
        }
    }

    private String nextHost() {
        this.currentHost++;
        if (this.currentHost == this.hostsArray.length) {
            this.currentHost = 0;
        }
        return this.hostsArray[this.currentHost];
    }

    private HttpURLConnection getHost(String str, String str2) {
        HttpURLConnection httpURLConnection = null;
        try {
            httpURLConnection = (HttpURLConnection) new URL(String.format(str2, str, this.graphName)).openConnection();
            httpURLConnection.setDoOutput(true);
        } catch (IOException e) {
            this.LOG.error("IOException", e);
        }
        return httpURLConnection;
    }

    private String executePost(HttpURLConnection httpURLConnection, String str) {
        try {
            httpURLConnection.setRequestMethod("POST");
            httpURLConnection.addRequestProperty("Content-Type", "application/POST");
            httpURLConnection.setRequestProperty("Content-Length", Integer.toString(str.length()));
            httpURLConnection.getOutputStream().write(str.getBytes("UTF8"));
            return httpURLConnection.getResponseMessage();
        } catch (IOException e) {
            this.LOG.error(ErrorMessage.ERROR_COMMUNICATING_TO_HOST.getMessage(new Object[]{httpURLConnection.getURL().toString()}));
            return null;
        } catch (HTTPException e2) {
            this.LOG.error(ErrorMessage.ERROR_IN_DISTRIBUTED_TRANSACTION.getMessage(new Object[]{httpURLConnection.getURL().toString(), Integer.valueOf(e2.getStatusCode()), getResponseMessage(httpURLConnection), str}));
            return null;
        }
    }

    private String getResponseMessage(HttpURLConnection httpURLConnection) {
        try {
            return httpURLConnection.getResponseMessage();
        } catch (IOException e) {
            this.LOG.error("IOException", e);
            return null;
        }
    }

    private String getResponseBody(HttpURLConnection httpURLConnection) {
        try {
            return IOUtils.toString(httpURLConnection.getInputStream());
        } catch (IOException e) {
            this.LOG.error("IOException", e);
            return null;
        }
    }

    private int getResponseCode(HttpURLConnection httpURLConnection) {
        try {
            return httpURLConnection.getResponseCode();
        } catch (IOException e) {
            this.LOG.error("IOException", e);
            return 0;
        }
    }
}
