package io.streamthoughts.jikkou.kafka.action;

import io.streamthoughts.jikkou.common.utils.Strings;
import io.streamthoughts.jikkou.core.action.Action;
import io.streamthoughts.jikkou.core.action.ExecutionError;
import io.streamthoughts.jikkou.core.action.ExecutionResult;
import io.streamthoughts.jikkou.core.action.ExecutionResultSet;
import io.streamthoughts.jikkou.core.action.ExecutionStatus;
import io.streamthoughts.jikkou.core.annotation.Description;
import io.streamthoughts.jikkou.core.annotation.Named;
import io.streamthoughts.jikkou.core.annotation.SupportedResource;
import io.streamthoughts.jikkou.core.annotation.Title;
import io.streamthoughts.jikkou.core.config.Configuration;
import io.streamthoughts.jikkou.core.extension.ContextualExtension;
import io.streamthoughts.jikkou.core.extension.ExtensionContext;
import io.streamthoughts.jikkou.core.extension.annotations.ExtensionOptionSpec;
import io.streamthoughts.jikkou.core.extension.annotations.ExtensionSpec;
import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientFactory;
import io.streamthoughts.jikkou.kafka.internals.admin.DefaultAdminClientFactory;
import io.streamthoughts.jikkou.kafka.models.V1KafkaConsumerGroup;
import io.streamthoughts.jikkou.kafka.reconciler.KafkaClientConfiguration;
import io.streamthoughts.jikkou.kafka.reconciler.service.KafkaConsumerGroupService;
import io.streamthoughts.jikkou.kafka.reconciler.service.KafkaOffsetSpec;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.AdminClient;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SupportedResource(type = V1KafkaConsumerGroup.class)
@Named("KafkaConsumerGroupsResetOffsets")
@Description("Reset offsets of consumer group. Supports one consumer group at the time, and group should be in EMPTY state.\nYou must choose one of the following reset specifications: to-datetime, by-duration, to-earliest, to-latest, to-offset.\n")
@Title("Reset offsets of consumer group.")
@ExtensionSpec(options = {@ExtensionOptionSpec(name = KafkaConsumerGroupsResetOffsets.GROUP, description = "The consumer group to act on.", type = String.class, required = true), @ExtensionOptionSpec(name = KafkaConsumerGroupsResetOffsets.TOPIC, description = "The topic whose partitions must be included in the reset-offset action.", type = List.class, required = true), @ExtensionOptionSpec(name = KafkaConsumerGroupsResetOffsets.TO_DATETIME, description = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'", type = String.class, defaultValue = "_NULL_"), @ExtensionOptionSpec(name = KafkaConsumerGroupsResetOffsets.TO_EARLIEST, description = "Reset offsets to earliest offset.", type = Boolean.class, defaultValue = "_NULL_"), @ExtensionOptionSpec(name = KafkaConsumerGroupsResetOffsets.TO_LATEST, description = "Reset offsets to latest offset.", type = Boolean.class, defaultValue = "_NULL_"), @ExtensionOptionSpec(name = KafkaConsumerGroupsResetOffsets.TO_OFFSET, description = "Reset offsets to a specific offset.", type = Long.class, defaultValue = "_NULL_"), @ExtensionOptionSpec(name = KafkaConsumerGroupsResetOffsets.DRY_RUN, description = "Only show results without executing changes on Consumer Groups.", type = Boolean.class, defaultValue = "false")})
/* loaded from: input_file:io/streamthoughts/jikkou/kafka/action/KafkaConsumerGroupsResetOffsets.class */
public final class KafkaConsumerGroupsResetOffsets extends ContextualExtension implements Action<V1KafkaConsumerGroup> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerGroupsResetOffsets.class);
    public static final String TO_EARLIEST = "to-earliest";
    public static final String TO_LATEST = "to-latest";
    public static final String TO_DATETIME = "to-datetime";
    public static final String TOPIC = "topic";
    public static final String GROUP = "group";
    public static final String TO_OFFSET = "to-offset";
    public static final String DRY_RUN = "dry-run";
    private AdminClientFactory adminClientFactory;

    public void init(@NotNull ExtensionContext extensionContext) {
        super.init(extensionContext);
        this.adminClientFactory = new DefaultAdminClientFactory((Supplier<Map<String, Object>>) () -> {
            return (Map) KafkaClientConfiguration.ADMIN_CLIENT_CONFIG.get(extensionContext.appConfiguration());
        });
    }

    @NotNull
    public ExecutionResultSet<V1KafkaConsumerGroup> execute(@NotNull Configuration configuration) {
        AdminClient createAdminClient = this.adminClientFactory.createAdminClient();
        try {
            KafkaConsumerGroupService kafkaConsumerGroupService = new KafkaConsumerGroupService(createAdminClient);
            KafkaOffsetSpec kafkaOffsetSpec = (KafkaOffsetSpec) extensionContext().configProperty(TO_OFFSET).getOptional(configuration).map(l -> {
                return new KafkaOffsetSpec.ToOffset(l);
            }).orElse((KafkaOffsetSpec) extensionContext().configProperty(TO_DATETIME).getOptional(configuration).filter(Predicate.not(Strings::isBlank)).map(str -> {
                return KafkaOffsetSpec.ToTimestamp.fromISODateTime(str);
            }).orElse((KafkaOffsetSpec) extensionContext().configProperty(TO_LATEST).getOptional(configuration).map(bool -> {
                return new KafkaOffsetSpec.ToLatest();
            }).orElse((KafkaOffsetSpec) extensionContext().configProperty(TO_EARLIEST).getOptional(configuration).map(bool2 -> {
                return new KafkaOffsetSpec.ToEarliest();
            }).orElse(null))));
            if (kafkaOffsetSpec == null) {
                ExecutionResultSet<V1KafkaConsumerGroup> build = ExecutionResultSet.newBuilder().result(ExecutionResult.newBuilder().status(ExecutionStatus.FAILED).errors(List.of(new ExecutionError("No reset specification for offsets: One of these options is expected: [to-datetime, by-duration, to-earliest, to-latest, to-offset]."))).build()).build();
                if (createAdminClient != null) {
                    createAdminClient.close();
                }
                return build;
            }
            try {
                String str2 = (String) extensionContext().configProperty(GROUP).get(configuration);
                List<String> list = (List) extensionContext().configProperty(TOPIC).get(configuration);
                Boolean bool3 = (Boolean) extensionContext().configProperty(DRY_RUN).get(configuration);
                if (LOG.isInfoEnabled()) {
                    LOG.info("Alter consumer group '{}' for topics '{}', and offsets: {} (DRY_RUN: {}).", new Object[]{str2, list, kafkaOffsetSpec, bool3});
                }
                ExecutionResultSet<V1KafkaConsumerGroup> build2 = ExecutionResultSet.newBuilder().result(ExecutionResult.newBuilder().status(ExecutionStatus.SUCCEEDED).data(kafkaConsumerGroupService.resetConsumerGroupOffsets(str2, list, kafkaOffsetSpec, bool3.booleanValue())).build()).build();
                if (createAdminClient != null) {
                    createAdminClient.close();
                }
                return build2;
            } catch (Exception e) {
                ExecutionResultSet<V1KafkaConsumerGroup> build3 = ExecutionResultSet.newBuilder().result(ExecutionResult.newBuilder().status(ExecutionStatus.FAILED).errors(List.of(new ExecutionError(e.getLocalizedMessage()))).build()).build();
                if (createAdminClient != null) {
                    createAdminClient.close();
                }
                return build3;
            }
        } catch (Throwable th) {
            if (createAdminClient != null) {
                try {
                    createAdminClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
