package io.activej.dataflow.graph;

import io.activej.async.exception.AsyncCloseException;
import io.activej.common.Checks;
import io.activej.dataflow.exception.DataflowException;
import io.activej.dataflow.inject.DatasetIdModule;
import io.activej.dataflow.node.Node;
import io.activej.dataflow.node.impl.Download;
import io.activej.dataflow.node.impl.Upload;
import io.activej.datastream.consumer.StreamConsumer;
import io.activej.datastream.supplier.StreamSupplier;
import io.activej.inject.Key;
import io.activej.inject.ResourceLocator;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.promise.SettablePromise;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/activej/dataflow/graph/Task.class */
public final class Task {
    private final long taskId;
    private final ResourceLocator environment;
    private final DatasetIdModule.DatasetIds datasetIds;
    private final List<Node> nodes;

    @Nullable
    private Instant started;

    @Nullable
    private Instant finished;

    @Nullable
    private Exception error;

    @Nullable
    private List<Promise<Void>> currentNodeAcks;
    private static final Function<Node, String> NODE_ID_FUNCTION = node -> {
        return "n" + node.getIndex();
    };
    private final Map<StreamId, StreamSupplier<?>> suppliers = new LinkedHashMap();
    private final Map<StreamId, StreamConsumer<?>> consumers = new LinkedHashMap();
    private final SettablePromise<Void> executionPromise = new SettablePromise<>();
    private final AtomicBoolean bound = new AtomicBoolean();
    private TaskStatus status = TaskStatus.RUNNING;

    public Task(long j, ResourceLocator resourceLocator, List<Node> list) {
        this.taskId = j;
        this.environment = resourceLocator;
        this.nodes = list;
        this.datasetIds = (DatasetIdModule.DatasetIds) resourceLocator.getInstance(DatasetIdModule.DatasetIds.class);
    }

    public void bind() {
        if (!this.bound.compareAndSet(false, true)) {
            throw new IllegalStateException("Task was already bound!");
        }
        for (Node node : this.nodes) {
            this.currentNodeAcks = new ArrayList();
            node.createAndBind(this);
            Promises.all(this.currentNodeAcks).whenComplete((r4, exc) -> {
                node.finish(exc);
            });
        }
        this.currentNodeAcks = null;
    }

    public Object get(String str) {
        return this.environment.getInstance(this.datasetIds.getKeyForId(str));
    }

    public <T> T get(Class<T> cls) {
        return (T) this.environment.getInstance(cls);
    }

    public <T> T get(Key<T> key) {
        return (T) this.environment.getInstance(key);
    }

    public <T> void bindChannel(StreamId streamId, StreamConsumer<T> streamConsumer) {
        Checks.checkState(!this.consumers.containsKey(streamId), "Already bound");
        Checks.checkState(this.currentNodeAcks != null, "Must bind streams only from createAndBind");
        this.consumers.put(streamId, streamConsumer);
        this.currentNodeAcks.add(streamConsumer.getAcknowledgement());
    }

    public <T> void export(StreamId streamId, StreamSupplier<T> streamSupplier) {
        Checks.checkState(!this.suppliers.containsKey(streamId), "Already exported");
        Checks.checkState(this.currentNodeAcks != null, "Must bind streams only from createAndBind");
        this.suppliers.put(streamId, streamSupplier);
        this.currentNodeAcks.add(streamSupplier.getAcknowledgement());
    }

    public Promise<Void> execute() {
        this.started = Instant.now();
        return Promises.all((List) this.suppliers.entrySet().stream().map(entry -> {
            StreamId streamId = (StreamId) entry.getKey();
            try {
                StreamSupplier streamSupplier = (StreamSupplier) entry.getValue();
                StreamConsumer<?> streamConsumer = this.consumers.get(streamId);
                Checks.checkNotNull(streamSupplier, "Supplier not found for %s, consumer %s", new Object[]{streamId, streamConsumer});
                Checks.checkNotNull(streamConsumer, "Consumer not found for %s, supplier %s", new Object[]{streamId, streamSupplier});
                return streamSupplier.streamTo(streamConsumer);
            } catch (Exception e) {
                return Promise.ofException(new DataflowException(e));
            }
        }).collect(Collectors.toList())).whenComplete((r6, exc) -> {
            this.finished = Instant.now();
            if (exc == null || (exc instanceof DataflowException)) {
                this.error = exc;
            } else {
                this.error = new DataflowException(exc);
            }
            this.status = exc == null ? TaskStatus.COMPLETED : exc instanceof AsyncCloseException ? TaskStatus.CANCELED : TaskStatus.FAILED;
            this.executionPromise.set(r6, exc);
        });
    }

    public void cancel() {
        this.suppliers.values().forEach((v0) -> {
            v0.close();
        });
        this.consumers.values().forEach((v0) -> {
            v0.close();
        });
    }

    public Promise<Void> getExecutionPromise() {
        return this.executionPromise;
    }

    public boolean isExecuted() {
        return this.executionPromise.isComplete();
    }

    public TaskStatus getStatus() {
        return this.status;
    }

    @Nullable
    public Instant getStartTime() {
        return this.started;
    }

    @Nullable
    public Instant getFinishTime() {
        return this.finished;
    }

    @Nullable
    public Exception getError() {
        return this.error;
    }

    public List<Node> getNodes() {
        return this.nodes;
    }

    public long getTaskId() {
        return this.taskId;
    }

    @JmxAttribute
    public String getGraphViz() {
        Node node;
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        HashMap hashMap4 = new HashMap();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (Node node2 : this.nodes) {
            if (node2 instanceof Download) {
                Download download = (Download) node2;
                hashMap4.put(download.streamId, download.output);
            } else if (node2 instanceof Upload) {
                hashMap3.put(((Upload) node2).streamId, node2);
            } else {
                node2.getInputs().forEach(streamId -> {
                    hashMap.put(streamId, node2);
                });
                node2.getOutputs().forEach(streamId2 -> {
                    hashMap2.put(streamId2, node2);
                });
            }
        }
        StringBuilder sb = new StringBuilder("digraph {\n");
        for (Node node3 : this.nodes) {
            if (node3 instanceof Download) {
                StreamId streamId3 = ((Download) node3).streamId;
                if (!hashMap2.containsKey(streamId3) && (node = (Node) hashMap.get(((Download) node3).output)) != null) {
                    String str = "n" + node3.getIndex();
                    sb.append("  " + str + " [id=\"" + str + "\", shape=point, xlabel=\"" + streamId3.getId() + "\"]\n");
                    sb.append("  " + str + " -> " + ((String) linkedHashMap.computeIfAbsent(node, NODE_ID_FUNCTION)) + " [style=dashed]\n");
                }
            } else if (!(node3 instanceof Upload)) {
                String str2 = (String) linkedHashMap.computeIfAbsent(node3, NODE_ID_FUNCTION);
                for (StreamId streamId4 : node3.getOutputs()) {
                    Node node4 = (Node) hashMap.get(streamId4);
                    if (node4 != null) {
                        sb.append("  " + str2 + " -> " + ((String) linkedHashMap.computeIfAbsent(node4, NODE_ID_FUNCTION)) + "\n");
                    } else {
                        Node node5 = (Node) hashMap.get(hashMap4.get(streamId4));
                        if (node5 != null) {
                            sb.append("  " + str2 + " -> " + ((String) linkedHashMap.computeIfAbsent(node5, NODE_ID_FUNCTION)));
                        } else {
                            Node node6 = (Node) hashMap3.get(streamId4);
                            String str3 = node6 != null ? "n" + node6.getIndex() : "s" + streamId4.getId();
                            sb.append("  " + str3 + " [id=\"" + str3 + "\", shape=point, xlabel=\"" + streamId4.getId() + "\"]\n");
                            sb.append("  " + str2 + " -> " + str3);
                        }
                        sb.append(" [style=dashed]\n");
                    }
                }
            }
        }
        sb.append('\n');
        linkedHashMap.forEach((node7, str4) -> {
            sb.append("  " + str4).append(" [label=\"" + node7.getClass().getSimpleName()).append("\" id=" + str4);
            Exception error = node7.getError();
            if (error != null) {
                StringWriter stringWriter = new StringWriter();
                error.printStackTrace(new PrintWriter(stringWriter));
                sb.append(" color=red tooltip=\"").append(stringWriter.toString().replace("\"", "\\\"")).append("\"");
            } else if (node7.getFinished() != null) {
                sb.append(" color=blue");
            }
            sb.append("]\n");
        });
        return sb.append('}').toString();
    }
}
