package io.activej.dataflow.http;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.activej.csp.binary.codec.ByteBufsCodec;
import io.activej.csp.net.Messaging;
import io.activej.dataflow.DataflowClient;
import io.activej.dataflow.exception.DataflowException;
import io.activej.dataflow.graph.Partition;
import io.activej.dataflow.graph.TaskStatus;
import io.activej.dataflow.messaging.DataflowRequest;
import io.activej.dataflow.messaging.DataflowResponse;
import io.activej.dataflow.stats.NodeStat;
import io.activej.dataflow.stats.StatReducer;
import io.activej.http.AsyncServlet;
import io.activej.http.HttpError;
import io.activej.http.HttpMethod;
import io.activej.http.HttpRequest;
import io.activej.http.HttpResponse;
import io.activej.http.RoutingServlet;
import io.activej.http.StaticServlet;
import io.activej.http.loader.IStaticLoader;
import io.activej.inject.Key;
import io.activej.inject.ResourceLocator;
import io.activej.net.socket.tcp.TcpSocket;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.reactor.AbstractReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import io.activej.types.Types;
import java.lang.reflect.Type;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/activej/dataflow/http/DataflowDebugServlet.class */
public final class DataflowDebugServlet extends AbstractReactive implements AsyncServlet {
    private final AsyncServlet servlet;
    private final ByteBufsCodec<DataflowResponse, DataflowRequest> codec;

    public DataflowDebugServlet(Reactor reactor, List<Partition> list, Executor executor, ByteBufsCodec<DataflowResponse, DataflowRequest> byteBufsCodec, ResourceLocator resourceLocator) {
        super(reactor);
        this.codec = byteBufsCodec;
        ObjectMapper registerModule = new ObjectMapper().registerModule(new JavaTimeModule());
        registerModule.configure(SerializationFeature.WRITE_DATE_TIMESTAMPS_AS_NANOSECONDS, false);
        this.servlet = (AsyncServlet) RoutingServlet.builder(reactor).with("/*", (AsyncServlet) StaticServlet.builder(reactor, IStaticLoader.ofClassPath(reactor, executor, "debug")).withIndexHtml().build()).with("/api/*", (AsyncServlet) RoutingServlet.builder(reactor).with(HttpMethod.GET, "/partitions", httpRequest -> {
            return HttpResponse.ok200().withJson(registerModule.writeValueAsString(list.stream().map((v0) -> {
                return v0.address();
            }).collect(Collectors.toList()))).toPromise();
        }).with(HttpMethod.GET, "/tasks", httpRequest2 -> {
            return Promises.toList(list.stream().map(partition -> {
                return getPartitionData(partition.address());
            })).then(list2 -> {
                HashMap hashMap = new HashMap();
                for (int i = 0; i < list2.size(); i++) {
                    for (DataflowResponse.TaskDescription taskDescription : ((DataflowResponse.PartitionData) list2.get(i)).lastTasks()) {
                        ((List) hashMap.computeIfAbsent(Long.valueOf(taskDescription.id()), l -> {
                            return Arrays.asList(new TaskStatus[list2.size()]);
                        })).set(i, taskDescription.status());
                    }
                }
                return HttpResponse.ok200().withJson(registerModule.writeValueAsString(hashMap)).toPromise();
            });
        }).with(HttpMethod.GET, "/tasks/:taskID", httpRequest3 -> {
            long taskId = getTaskId(httpRequest3);
            return Promises.toList((List) list.stream().map(partition -> {
                return getTask(partition.address(), taskId);
            }).collect(Collectors.toList())).then(list2 -> {
                List asList = Arrays.asList(new TaskStatus[list2.size()]);
                HashMap hashMap = new HashMap();
                for (int i = 0; i < list2.size(); i++) {
                    DataflowResponse.TaskData taskData = (DataflowResponse.TaskData) list2.get(i);
                    if (taskData != null) {
                        asList.set(i, taskData.status());
                        int i2 = i;
                        taskData.nodes().forEach((num, nodeStat) -> {
                            ((List) hashMap.computeIfAbsent(num, num -> {
                                return Arrays.asList(new NodeStat[list2.size()]);
                            })).set(i2, nodeStat);
                        });
                    }
                }
                return HttpResponse.ok200().withJson(registerModule.writeValueAsString(new ReducedTaskData(asList, ((DataflowResponse.TaskData) list2.get(0)).graphViz(), (Map) hashMap.entrySet().stream().collect(HashMap::new, (hashMap2, entry) -> {
                    NodeStat reduce = reduce((List) entry.getValue(), resourceLocator);
                    if (reduce != null) {
                        hashMap2.put((Integer) entry.getKey(), reduce);
                    }
                }, (v0, v1) -> {
                    v0.putAll(v1);
                })))).toPromise();
            });
        }).with(HttpMethod.GET, "/tasks/:taskID/:index", httpRequest4 -> {
            try {
                return getTask(((Partition) list.get(Integer.parseInt(httpRequest4.getPathParameter("index")))).address(), getTaskId(httpRequest4)).then(taskData -> {
                    return HttpResponse.ok200().withJson(registerModule.writeValueAsString(new LocalTaskData(taskData.status(), taskData.graphViz(), (Map) taskData.nodes().entrySet().stream().collect(Collectors.toMap((v0) -> {
                        return v0.getKey();
                    }, (v0) -> {
                        return v0.getValue();
                    })), taskData.startTime(), taskData.finishTime(), taskData.error()))).toPromise();
                });
            } catch (IndexOutOfBoundsException | NumberFormatException e) {
                throw HttpError.ofCode(400, "Bad index");
            }
        }).build()).build();
    }

    @Nullable
    private static NodeStat reduce(List<NodeStat> list, ResourceLocator resourceLocator) {
        StatReducer statReducer;
        Optional<NodeStat> findAny = list.stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).findAny();
        if (findAny.isEmpty() || (statReducer = (StatReducer) resourceLocator.getInstanceOrNull(Key.ofType(Types.parameterizedType(StatReducer.class, new Type[]{findAny.get().getClass()})))) == null) {
            return null;
        }
        return statReducer.reduce(list);
    }

    private static long getTaskId(HttpRequest httpRequest) throws HttpError {
        String pathParameter = httpRequest.getPathParameter("taskID");
        try {
            return Long.parseLong(pathParameter);
        } catch (NumberFormatException e) {
            throw HttpError.ofCode(400, "Bad number " + pathParameter);
        }
    }

    private Promise<DataflowResponse.PartitionData> getPartitionData(InetSocketAddress inetSocketAddress) {
        return TcpSocket.connect(Reactor.getCurrentReactor(), inetSocketAddress).then(tcpSocket -> {
            Messaging create = Messaging.create(tcpSocket, this.codec);
            Promise then = DataflowClient.performHandshake(create).then(() -> {
                return create.send(new DataflowRequest.GetTasks(null));
            });
            Objects.requireNonNull(create);
            return then.then(create::receive).map(dataflowResponse -> {
                create.close();
                if (dataflowResponse instanceof DataflowResponse.PartitionData) {
                    return (DataflowResponse.PartitionData) dataflowResponse;
                }
                if (dataflowResponse instanceof DataflowResponse.Result) {
                    throw new DataflowException("Error on remote server " + inetSocketAddress + ": " + ((DataflowResponse.Result) dataflowResponse).error());
                }
                throw new DataflowException("Bad response from server");
            });
        });
    }

    private Promise<DataflowResponse.TaskData> getTask(InetSocketAddress inetSocketAddress, long j) {
        return TcpSocket.connect(Reactor.getCurrentReactor(), inetSocketAddress).then(tcpSocket -> {
            Messaging create = Messaging.create(tcpSocket, this.codec);
            return DataflowClient.performHandshake(create).then(() -> {
                return create.send(new DataflowRequest.GetTasks(Long.valueOf(j)));
            }).then(r3 -> {
                return create.receive();
            }).map(dataflowResponse -> {
                create.close();
                if (dataflowResponse instanceof DataflowResponse.TaskData) {
                    return (DataflowResponse.TaskData) dataflowResponse;
                }
                if (dataflowResponse instanceof DataflowResponse.Result) {
                    throw new DataflowException("Error on remote server " + inetSocketAddress + ": " + ((DataflowResponse.Result) dataflowResponse).error());
                }
                throw new DataflowException("Bad response from server");
            });
        });
    }

    public Promise<HttpResponse> serve(HttpRequest httpRequest) throws Exception {
        Reactive.checkInReactorThread(this);
        return this.servlet.serve(httpRequest);
    }
}
