package io.streamthoughts.jikkou.kafka.connect.change;

import io.streamthoughts.jikkou.core.models.HasMetadataChange;
import io.streamthoughts.jikkou.core.reconcilier.ChangeDescription;
import io.streamthoughts.jikkou.core.reconcilier.ChangeError;
import io.streamthoughts.jikkou.core.reconcilier.ChangeHandler;
import io.streamthoughts.jikkou.core.reconcilier.ChangeMetadata;
import io.streamthoughts.jikkou.core.reconcilier.ChangeResponse;
import io.streamthoughts.jikkou.core.reconcilier.ChangeType;
import io.streamthoughts.jikkou.http.client.RestClientException;
import io.streamthoughts.jikkou.kafka.connect.KafkaConnectConstants;
import io.streamthoughts.jikkou.kafka.connect.api.KafkaConnectApi;
import io.streamthoughts.jikkou.kafka.connect.api.data.ErrorResponse;
import io.streamthoughts.jikkou.kafka.connect.models.KafkaConnectorState;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeHandler.class */
public final class KafkaConnectorChangeHandler implements ChangeHandler<KafkaConnectorChange> {
    private final KafkaConnectApi api;
    private final String cluster;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeHandler$1, reason: invalid class name */
    /* loaded from: input_file:io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeHandler$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$streamthoughts$jikkou$core$reconcilier$ChangeType;

        static {
            try {
                $SwitchMap$io$streamthoughts$jikkou$kafka$connect$models$KafkaConnectorState[KafkaConnectorState.PAUSED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$streamthoughts$jikkou$kafka$connect$models$KafkaConnectorState[KafkaConnectorState.STOPPED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$streamthoughts$jikkou$kafka$connect$models$KafkaConnectorState[KafkaConnectorState.RUNNING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$streamthoughts$jikkou$kafka$connect$models$KafkaConnectorState[KafkaConnectorState.UNASSIGNED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$streamthoughts$jikkou$kafka$connect$models$KafkaConnectorState[KafkaConnectorState.RESTARTING.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$streamthoughts$jikkou$kafka$connect$models$KafkaConnectorState[KafkaConnectorState.FAILED.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            $SwitchMap$io$streamthoughts$jikkou$core$reconcilier$ChangeType = new int[ChangeType.values().length];
            try {
                $SwitchMap$io$streamthoughts$jikkou$core$reconcilier$ChangeType[ChangeType.NONE.ordinal()] = 1;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$io$streamthoughts$jikkou$core$reconcilier$ChangeType[ChangeType.IGNORE.ordinal()] = 2;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$io$streamthoughts$jikkou$core$reconcilier$ChangeType[ChangeType.UPDATE.ordinal()] = 3;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$io$streamthoughts$jikkou$core$reconcilier$ChangeType[ChangeType.ADD.ordinal()] = 4;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$io$streamthoughts$jikkou$core$reconcilier$ChangeType[ChangeType.DELETE.ordinal()] = 5;
            } catch (NoSuchFieldError e11) {
            }
        }
    }

    public KafkaConnectorChangeHandler(@NotNull KafkaConnectApi kafkaConnectApi, @NotNull String str) {
        this.api = (KafkaConnectApi) Objects.requireNonNull(kafkaConnectApi);
        this.cluster = str;
    }

    public Set<ChangeType> supportedChangeTypes() {
        return Set.of(ChangeType.ADD, ChangeType.DELETE, ChangeType.UPDATE);
    }

    public List<ChangeResponse<KafkaConnectorChange>> apply(@NotNull List<HasMetadataChange<KafkaConnectorChange>> list) {
        return list.stream().flatMap(this::handleChange).toList();
    }

    private Stream<ChangeResponse<KafkaConnectorChange>> handleChange(HasMetadataChange<KafkaConnectorChange> hasMetadataChange) {
        switch (AnonymousClass1.$SwitchMap$io$streamthoughts$jikkou$core$reconcilier$ChangeType[((KafkaConnectorChange) hasMetadataChange.getChange()).operation().ordinal()]) {
            case 1:
            case 2:
                return Stream.empty();
            case 3:
                return updateConnector(hasMetadataChange);
            case 4:
                return createOrUpdateConnectorConfig(hasMetadataChange);
            case 5:
                return deleteConnector(hasMetadataChange);
            default:
                throw new MatchException((String) null, (Throwable) null);
        }
    }

    @NotNull
    private Stream<ChangeResponse<KafkaConnectorChange>> updateConnector(HasMetadataChange<KafkaConnectorChange> hasMetadataChange) {
        Optional empty;
        KafkaConnectorChange kafkaConnectorChange = (KafkaConnectorChange) hasMetadataChange.getChange();
        if (!kafkaConnectorChange.isStateOnlyChange()) {
            return createOrUpdateConnectorConfig(hasMetadataChange);
        }
        KafkaConnectorState kafkaConnectorState = (KafkaConnectorState) kafkaConnectorChange.state().getAfter();
        String name = kafkaConnectorChange.name();
        switch (kafkaConnectorState) {
            case PAUSED:
                empty = Optional.of(CompletableFuture.runAsync(() -> {
                    this.api.pauseConnector(name);
                }));
                break;
            case STOPPED:
                empty = Optional.of(CompletableFuture.runAsync(() -> {
                    this.api.stopConnector(name);
                }));
                break;
            case RUNNING:
                empty = Optional.of(CompletableFuture.runAsync(() -> {
                    this.api.resumeConnector(name);
                }));
                break;
            case UNASSIGNED:
            case RESTARTING:
            case FAILED:
                empty = Optional.empty();
                break;
            default:
                throw new MatchException((String) null, (Throwable) null);
        }
        return (Stream) empty.map(completableFuture -> {
            return toChangeResponse(hasMetadataChange, completableFuture);
        }).map((v0) -> {
            return Stream.of(v0);
        }).orElse(Stream.empty());
    }

    @NotNull
    private Stream<ChangeResponse<KafkaConnectorChange>> deleteConnector(HasMetadataChange<KafkaConnectorChange> hasMetadataChange) {
        return Stream.of(toChangeResponse(hasMetadataChange, CompletableFuture.runAsync(() -> {
            this.api.deleteConnector(((KafkaConnectorChange) hasMetadataChange.getChange()).name());
        })));
    }

    @NotNull
    private Stream<ChangeResponse<KafkaConnectorChange>> createOrUpdateConnectorConfig(HasMetadataChange<KafkaConnectorChange> hasMetadataChange) {
        return Stream.of(toChangeResponse(hasMetadataChange, CompletableFuture.supplyAsync(() -> {
            return this.api.createOrUpdateConnector(((KafkaConnectorChange) hasMetadataChange.getChange()).name(), buildConnectorConfig((KafkaConnectorChange) hasMetadataChange.getChange()));
        })));
    }

    public ChangeDescription getDescriptionFor(@NotNull HasMetadataChange<KafkaConnectorChange> hasMetadataChange) {
        return new KafkaConnectorChangeDescription(this.cluster, (KafkaConnectorChange) hasMetadataChange.getChange());
    }

    private Map<String, Object> buildConnectorConfig(KafkaConnectorChange kafkaConnectorChange) {
        HashMap hashMap = new HashMap();
        hashMap.put(KafkaConnectConstants.CONNECTOR_TASKS_MAX_CONFIG, kafkaConnectorChange.tasksMax().getAfter());
        hashMap.put(KafkaConnectConstants.CONNECTOR_CLASS_CONFIG, kafkaConnectorChange.connectorClass().getAfter());
        hashMap.putAll((Map) kafkaConnectorChange.config().stream().filter(configEntryChange -> {
            return configEntryChange.operation() != ChangeType.DELETE;
        }).collect(Collectors.toMap((v0) -> {
            return v0.name();
        }, configEntryChange2 -> {
            return configEntryChange2.valueChange().getAfter();
        })));
        return hashMap;
    }

    private ChangeResponse<KafkaConnectorChange> toChangeResponse(HasMetadataChange<KafkaConnectorChange> hasMetadataChange, CompletableFuture<?> completableFuture) {
        return new ChangeResponse<>(hasMetadataChange, completableFuture.handle((obj, th) -> {
            if (th == null) {
                return ChangeMetadata.empty();
            }
            if (th.getCause() != null) {
                th = th.getCause();
            }
            if (!(th instanceof RestClientException)) {
                return ChangeMetadata.of(th);
            }
            ErrorResponse errorResponse = (ErrorResponse) ((RestClientException) th).getResponseEntity(ErrorResponse.class);
            return new ChangeMetadata(new ChangeError(errorResponse.message(), Integer.valueOf(errorResponse.errorCode())));
        }));
    }
}
