package io.streamthoughts.jikkou.kafka.connect.action;

import io.streamthoughts.jikkou.common.utils.AsyncUtils;
import io.streamthoughts.jikkou.common.utils.Either;
import io.streamthoughts.jikkou.core.action.Action;
import io.streamthoughts.jikkou.core.action.ExecutionError;
import io.streamthoughts.jikkou.core.action.ExecutionResult;
import io.streamthoughts.jikkou.core.action.ExecutionResultSet;
import io.streamthoughts.jikkou.core.action.ExecutionStatus;
import io.streamthoughts.jikkou.core.annotation.Description;
import io.streamthoughts.jikkou.core.annotation.Named;
import io.streamthoughts.jikkou.core.annotation.SupportedResource;
import io.streamthoughts.jikkou.core.annotation.Title;
import io.streamthoughts.jikkou.core.config.Configuration;
import io.streamthoughts.jikkou.core.extension.ContextualExtension;
import io.streamthoughts.jikkou.core.extension.ExtensionContext;
import io.streamthoughts.jikkou.core.extension.annotations.ExtensionOptionSpec;
import io.streamthoughts.jikkou.core.extension.annotations.ExtensionSpec;
import io.streamthoughts.jikkou.core.models.ObjectMeta;
import io.streamthoughts.jikkou.kafka.connect.KafkaConnectExtensionConfig;
import io.streamthoughts.jikkou.kafka.connect.KafkaConnectLabels;
import io.streamthoughts.jikkou.kafka.connect.api.KafkaConnectApi;
import io.streamthoughts.jikkou.kafka.connect.api.KafkaConnectApiFactory;
import io.streamthoughts.jikkou.kafka.connect.api.KafkaConnectClientConfig;
import io.streamthoughts.jikkou.kafka.connect.api.data.ErrorResponse;
import io.streamthoughts.jikkou.kafka.connect.exception.KafkaConnectClusterNotFoundException;
import io.streamthoughts.jikkou.kafka.connect.models.V1KafkaConnector;
import io.streamthoughts.jikkou.kafka.connect.service.KafkaConnectClusterService;
import jakarta.ws.rs.core.Response;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.SwitchBootstraps;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SupportedResource(type = V1KafkaConnector.class)
@Named(KafkaConnectRestartConnectorsAction.NAME)
@Description("The KafkaConnectRestartConnectors action a user to restart all or just the failed Connector and Task instances for one or multiple named connectors.")
@Title("Restart Kafka Connect connector instances and task instances")
@ExtensionSpec(options = {@ExtensionOptionSpec(name = KafkaConnectRestartConnectorsAction.CONNECTOR_NAME_CONFIG, description = "The connector's name.", type = List.class, required = false), @ExtensionOptionSpec(name = "connect-cluster", description = "The name of the connect cluster.", type = String.class, required = false), @ExtensionOptionSpec(name = KafkaConnectRestartConnectorsAction.INCLUDE_TASKS_CONFIG, description = "Specifies whether to restart the connector instance and task instances (includeTasks=true) or just the connector instance (includeTasks=false)", type = Boolean.class, required = false), @ExtensionOptionSpec(name = KafkaConnectRestartConnectorsAction.ONLY_FAILED_CONFIG, description = "Specifies whether to restart just the instances with a FAILED status (onlyFailed=true) or all instances (onlyFailed=false)", type = Boolean.class, required = false)})
/* loaded from: input_file:io/streamthoughts/jikkou/kafka/connect/action/KafkaConnectRestartConnectorsAction.class */
public class KafkaConnectRestartConnectorsAction extends ContextualExtension implements Action<V1KafkaConnector> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaConnectRestartConnectorsAction.class);
    public static final String NAME = "KafkaConnectRestartConnectors";
    public static final String CONNECTOR_NAME_CONFIG = "connector-name";
    public static final String CONNECT_CLUSTER_CONFIG = "connect-cluster";
    public static final String INCLUDE_TASKS_CONFIG = "include-tasks";
    public static final String ONLY_FAILED_CONFIG = "only-failed";
    private KafkaConnectExtensionConfig kafkaConnectExtensionConfig;

    @Override // io.streamthoughts.jikkou.core.extension.ContextualExtension, io.streamthoughts.jikkou.core.extension.Extension
    public void init(@NotNull ExtensionContext extensionContext) {
        super.init(extensionContext);
        this.kafkaConnectExtensionConfig = new KafkaConnectExtensionConfig(extensionContext().appConfiguration());
    }

    @Override // io.streamthoughts.jikkou.core.action.Action
    @NotNull
    public ExecutionResultSet<V1KafkaConnector> execute(@NotNull Configuration configuration) {
        ExecutionResultSet<V1KafkaConnector> build;
        boolean booleanValue = ((Boolean) extensionContext().configProperty(INCLUDE_TASKS_CONFIG).getOptional(configuration).orElse(false)).booleanValue();
        boolean booleanValue2 = ((Boolean) extensionContext().configProperty(ONLY_FAILED_CONFIG).getOptional(configuration).orElse(false)).booleanValue();
        Set<String> connectClusters = getConnectClusters(configuration);
        ExecutorService newVirtualThreadPerTaskExecutor = Executors.newVirtualThreadPerTaskExecutor();
        try {
            List list = connectClusters.stream().map(str -> {
                return CompletableFuture.supplyAsync(() -> {
                    return restartConnectors(configuration, str, booleanValue, booleanValue2);
                }, newVirtualThreadPerTaskExecutor);
            }).toList();
            newVirtualThreadPerTaskExecutor.shutdown();
            Either either = AsyncUtils.get(AsyncUtils.waitForAll(list).thenApply(list2 -> {
                return list2.stream().flatMap((v0) -> {
                    return v0.stream();
                }).toList();
            }));
            Objects.requireNonNull(either);
            switch ((int) SwitchBootstraps.typeSwitch(MethodHandles.lookup(), "typeSwitch", MethodType.methodType(Integer.TYPE, Object.class, Integer.TYPE), Either.Left.class, Either.Right.class).dynamicInvoker().invoke(either, 0) /* invoke-custom */) {
                case 0:
                    build = ExecutionResultSet.newBuilder().results((List) ((Either.Left) either).left().get()).build();
                    break;
                case 1:
                    build = ExecutionResultSet.newBuilder().result(ExecutionResult.newBuilder().status(ExecutionStatus.FAILED).errors(List.of(new ExecutionError(((Throwable) ((Either.Right) either).right().get()).getLocalizedMessage()))).build()).build();
                    break;
                default:
                    throw new MatchException((String) null, (Throwable) null);
            }
            ExecutionResultSet<V1KafkaConnector> executionResultSet = build;
            if (newVirtualThreadPerTaskExecutor != null) {
                newVirtualThreadPerTaskExecutor.close();
            }
            return executionResultSet;
        } catch (Throwable th) {
            if (newVirtualThreadPerTaskExecutor != null) {
                try {
                    newVirtualThreadPerTaskExecutor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private List<ExecutionResult<V1KafkaConnector>> restartConnectors(@NotNull Configuration configuration, @NotNull String str, boolean z, boolean z2) {
        KafkaConnectApi create = KafkaConnectApiFactory.create(getKafkaConnectClientConfig(str));
        try {
            KafkaConnectClusterService kafkaConnectClusterService = new KafkaConnectClusterService(str, create);
            List<ExecutionResult<V1KafkaConnector>> list = (List) getConnectorsFromClusterOrConfig(configuration, create).stream().map(str2 -> {
                return restartConnector(str, str2, z, z2, create, kafkaConnectClusterService);
            }).collect(Collectors.toList());
            create.close();
            return list;
        } catch (Throwable th) {
            create.close();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static ExecutionResult<V1KafkaConnector> restartConnector(String str, String str2, boolean z, boolean z2, KafkaConnectApi kafkaConnectApi, KafkaConnectClusterService kafkaConnectClusterService) {
        ExecutionResult<V1KafkaConnector> build;
        try {
            Response restartConnector = kafkaConnectApi.restartConnector(str2, z, z2);
            int status = restartConnector.getStatus();
            if (status == 202 || status == 204) {
                build = ExecutionResult.newBuilder().status(ExecutionStatus.SUCCEEDED).data((V1KafkaConnector) AsyncUtils.getValue(kafkaConnectClusterService.getConnectorAsync(str2, true)).orElse(V1KafkaConnector.builder().withMetadata(ObjectMeta.builder().withName(str2).withLabel(KafkaConnectLabels.KAFKA_CONNECT_CLUSTER, str).build()).build())).build();
            } else {
                ErrorResponse errorResponse = (ErrorResponse) restartConnector.readEntity(ErrorResponse.class);
                build = ExecutionResult.newBuilder().status(ExecutionStatus.FAILED).errors(List.of(new ExecutionError(errorResponse.message(), Integer.valueOf(errorResponse.errorCode())))).build();
            }
        } catch (Exception e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            LOG.error("Failed to restart connectorName '{}' on connect clusterName {} (includeTasks={}, onlyFailed={}).", str2, str, Boolean.valueOf(z), Boolean.valueOf(z2), e);
            build = ExecutionResult.newBuilder().status(ExecutionStatus.FAILED).errors(List.of(new ExecutionError(e.getLocalizedMessage()))).build();
        }
        return build;
    }

    private KafkaConnectClientConfig getKafkaConnectClientConfig(@NotNull String str) {
        return this.kafkaConnectExtensionConfig.getConfigForCluster(str).orElseThrow(() -> {
            return new KafkaConnectClusterNotFoundException("No connect cluster configured for name '" + str + "'");
        });
    }

    @NotNull
    private List<String> getConnectorsFromClusterOrConfig(@NotNull Configuration configuration, @NotNull KafkaConnectApi kafkaConnectApi) {
        Optional optional = extensionContext().configProperty(CONNECTOR_NAME_CONFIG).getOptional(configuration);
        Objects.requireNonNull(kafkaConnectApi);
        return (List) optional.orElseGet(kafkaConnectApi::listConnectors);
    }

    @NotNull
    private Set<String> getConnectClusters(@NotNull Configuration configuration) {
        return (Set) extensionContext().configProperty("connect-cluster").getOptional(configuration).map(list -> {
            return new HashSet(list);
        }).orElseGet(() -> {
            return this.kafkaConnectExtensionConfig.getClusters();
        });
    }
}
