package io.activej.dataflow.http;

import io.activej.common.exception.UncheckedException;
import io.activej.csp.binary.ByteBufsCodec;
import io.activej.csp.net.MessagingWithBinaryStreaming;
import io.activej.dataflow.DataflowException;
import io.activej.dataflow.command.DataflowCommand;
import io.activej.dataflow.command.DataflowCommandGetTasks;
import io.activej.dataflow.command.DataflowResponse;
import io.activej.dataflow.command.DataflowResponsePartitionData;
import io.activej.dataflow.command.DataflowResponseResult;
import io.activej.dataflow.command.DataflowResponseTaskData;
import io.activej.dataflow.graph.Partition;
import io.activej.dataflow.graph.TaskStatus;
import io.activej.dataflow.json.JsonCodec;
import io.activej.dataflow.json.JsonUtils;
import io.activej.dataflow.stats.NodeStat;
import io.activej.dataflow.stats.StatReducer;
import io.activej.http.AsyncServlet;
import io.activej.http.HttpError;
import io.activej.http.HttpMethod;
import io.activej.http.HttpRequest;
import io.activej.http.HttpResponse;
import io.activej.http.RoutingServlet;
import io.activej.http.StaticServlet;
import io.activej.inject.Key;
import io.activej.inject.ResourceLocator;
import io.activej.inject.util.Types;
import io.activej.net.socket.tcp.AsyncTcpSocketNio;
import io.activej.promise.Promisable;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import java.lang.reflect.Type;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/activej/dataflow/http/DataflowDebugServlet.class */
public final class DataflowDebugServlet implements AsyncServlet {
    private final AsyncServlet servlet;
    private final ByteBufsCodec<DataflowResponse, DataflowCommand> codec;

    public DataflowDebugServlet(List<Partition> list, Executor executor, ByteBufsCodec<DataflowResponse, DataflowCommand> byteBufsCodec, ResourceLocator resourceLocator, JsonCodec<Map<Long, List<TaskStatus>>> jsonCodec) {
        this.codec = byteBufsCodec;
        JsonCodec jsonCodec2 = (JsonCodec) resourceLocator.getInstance(JsonUtils.codec(ReducedTaskData.class));
        JsonCodec jsonCodec3 = (JsonCodec) resourceLocator.getInstance(JsonUtils.codec(LocalTaskData.class));
        this.servlet = RoutingServlet.create().map("/*", StaticServlet.ofClassPath(executor, "debug").withIndexHtml()).map("/api/*", RoutingServlet.create().map(HttpMethod.GET, "/partitions", httpRequest -> {
            return HttpResponse.ok200().withJson((String) list.stream().map(partition -> {
                return "\"" + partition.getAddress().getAddress().getHostAddress() + ":" + partition.getAddress().getPort() + "\"";
            }).collect(Collectors.joining(",", "[", "]")));
        }).map(HttpMethod.GET, "/tasks", httpRequest2 -> {
            return Promises.toList(list.stream().map(partition -> {
                return getPartitionData(partition.getAddress());
            })).map(list2 -> {
                HashMap hashMap = new HashMap();
                for (int i = 0; i < list2.size(); i++) {
                    for (DataflowResponsePartitionData.TaskDesc taskDesc : ((DataflowResponsePartitionData) list2.get(i)).getLast()) {
                        ((List) hashMap.computeIfAbsent(Long.valueOf(taskDesc.getId()), l -> {
                            return Arrays.asList(new TaskStatus[list2.size()]);
                        })).set(i, taskDesc.getStatus());
                    }
                }
                return HttpResponse.ok200().withJson(JsonUtils.toJson(jsonCodec, hashMap));
            });
        }).map(HttpMethod.GET, "/tasks/:taskID", httpRequest3 -> {
            return getTaskId(httpRequest3).then(l -> {
                return Promises.toList((List) list.stream().map(partition -> {
                    return getTask(partition.getAddress(), l.longValue());
                }).collect(Collectors.toList())).map(list2 -> {
                    List asList = Arrays.asList(new TaskStatus[list2.size()]);
                    HashMap hashMap = new HashMap();
                    for (int i = 0; i < list2.size(); i++) {
                        DataflowResponseTaskData dataflowResponseTaskData = (DataflowResponseTaskData) list2.get(i);
                        if (dataflowResponseTaskData != null) {
                            asList.set(i, dataflowResponseTaskData.getStatus());
                            int i2 = i;
                            dataflowResponseTaskData.getNodes().forEach((num, nodeStat) -> {
                                ((List) hashMap.computeIfAbsent(num, num -> {
                                    return Arrays.asList(new NodeStat[list2.size()]);
                                })).set(i2, nodeStat);
                            });
                        }
                    }
                    return HttpResponse.ok200().withJson(JsonUtils.toJson(jsonCodec2, new ReducedTaskData(asList, ((DataflowResponseTaskData) list2.get(0)).getGraphViz(), (Map) hashMap.entrySet().stream().collect(HashMap::new, (hashMap2, entry) -> {
                        NodeStat reduce = reduce((List) entry.getValue(), resourceLocator);
                        if (reduce != null) {
                            hashMap2.put((Integer) entry.getKey(), reduce);
                        }
                    }, (v0, v1) -> {
                        v0.putAll(v1);
                    }))));
                });
            });
        }).map(HttpMethod.GET, "/tasks/:taskID/:index", httpRequest4 -> {
            return getTaskId(httpRequest4).then(l -> {
                try {
                    return getTask(((Partition) list.get(Integer.parseInt(httpRequest4.getPathParameter("index")))).getAddress(), l.longValue()).map(dataflowResponseTaskData -> {
                        return HttpResponse.ok200().withJson(JsonUtils.toJson(jsonCodec3, new LocalTaskData(dataflowResponseTaskData.getStatus(), dataflowResponseTaskData.getGraphViz(), dataflowResponseTaskData.getNodes(), dataflowResponseTaskData.getStartTime(), dataflowResponseTaskData.getFinishTime(), dataflowResponseTaskData.getErrorString())));
                    });
                } catch (IndexOutOfBoundsException | NumberFormatException e) {
                    return Promise.ofException(HttpError.ofCode(400, "Bad index"));
                }
            });
        }));
    }

    @Nullable
    private static NodeStat reduce(List<NodeStat> list, ResourceLocator resourceLocator) {
        StatReducer statReducer;
        Optional<NodeStat> findAny = list.stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).findAny();
        if (findAny.isPresent() && (statReducer = (StatReducer) resourceLocator.getInstanceOrNull(Key.ofType(Types.parameterized(StatReducer.class, new Type[]{findAny.get().getClass()})))) != null) {
            return statReducer.reduce(list);
        }
        return null;
    }

    private static Promise<Long> getTaskId(HttpRequest httpRequest) {
        String pathParameter = httpRequest.getPathParameter("taskID");
        try {
            return Promise.of(Long.valueOf(Long.parseLong(pathParameter)));
        } catch (NumberFormatException e) {
            return Promise.ofException(HttpError.ofCode(400, "Bad number " + pathParameter));
        }
    }

    private Promise<DataflowResponsePartitionData> getPartitionData(InetSocketAddress inetSocketAddress) {
        return AsyncTcpSocketNio.connect(inetSocketAddress).then(asyncTcpSocketNio -> {
            MessagingWithBinaryStreaming create = MessagingWithBinaryStreaming.create(asyncTcpSocketNio, this.codec);
            return create.send(new DataflowCommandGetTasks(null)).then(r3 -> {
                return create.receive();
            }).then(dataflowResponse -> {
                create.close();
                return dataflowResponse instanceof DataflowResponsePartitionData ? Promise.of((DataflowResponsePartitionData) dataflowResponse) : dataflowResponse instanceof DataflowResponseResult ? Promise.ofException(new DataflowException("Error on remote server " + inetSocketAddress + ": " + ((DataflowResponseResult) dataflowResponse).getError())) : Promise.ofException(new DataflowException("Bad response from server"));
            });
        });
    }

    private Promise<DataflowResponseTaskData> getTask(InetSocketAddress inetSocketAddress, long j) {
        return AsyncTcpSocketNio.connect(inetSocketAddress).then(asyncTcpSocketNio -> {
            MessagingWithBinaryStreaming create = MessagingWithBinaryStreaming.create(asyncTcpSocketNio, this.codec);
            return create.send(new DataflowCommandGetTasks(Long.valueOf(j))).then(r3 -> {
                return create.receive();
            }).then(dataflowResponse -> {
                create.close();
                return dataflowResponse instanceof DataflowResponseTaskData ? Promise.of((DataflowResponseTaskData) dataflowResponse) : dataflowResponse instanceof DataflowResponseResult ? Promise.ofException(new DataflowException("Error on remote server " + inetSocketAddress + ": " + ((DataflowResponseResult) dataflowResponse).getError())) : Promise.ofException(new DataflowException("Bad response from server"));
            });
        });
    }

    @NotNull
    public Promisable<HttpResponse> serve(@NotNull HttpRequest httpRequest) throws UncheckedException {
        return this.servlet.serve(httpRequest);
    }
}
