package io.streamthoughts.jikkou.kafka.connect.change;

import io.streamthoughts.jikkou.core.data.TypeConverter;
import io.streamthoughts.jikkou.core.models.change.ResourceChange;
import io.streamthoughts.jikkou.core.models.change.ResourceChangeSpec;
import io.streamthoughts.jikkou.core.models.change.SpecificStateChange;
import io.streamthoughts.jikkou.core.models.change.StateChange;
import io.streamthoughts.jikkou.core.reconciler.Change;
import io.streamthoughts.jikkou.core.reconciler.ChangeError;
import io.streamthoughts.jikkou.core.reconciler.ChangeMetadata;
import io.streamthoughts.jikkou.core.reconciler.ChangeResponse;
import io.streamthoughts.jikkou.core.reconciler.Operation;
import io.streamthoughts.jikkou.core.reconciler.TextDescription;
import io.streamthoughts.jikkou.core.reconciler.change.BaseChangeHandler;
import io.streamthoughts.jikkou.http.client.RestClientException;
import io.streamthoughts.jikkou.kafka.connect.KafkaConnectConstants;
import io.streamthoughts.jikkou.kafka.connect.api.KafkaConnectApi;
import io.streamthoughts.jikkou.kafka.connect.api.data.ErrorResponse;
import io.streamthoughts.jikkou.kafka.connect.models.KafkaConnectorState;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.VisibleForTesting;

/* loaded from: input_file:io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeHandler.class */
public final class KafkaConnectorChangeHandler extends BaseChangeHandler<ResourceChange> {
    private final KafkaConnectApi api;
    private final String cluster;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeHandler$1, reason: invalid class name */
    /* loaded from: input_file:io/streamthoughts/jikkou/kafka/connect/change/KafkaConnectorChangeHandler$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$streamthoughts$jikkou$core$reconciler$Operation;

        static {
            try {
                $SwitchMap$io$streamthoughts$jikkou$kafka$connect$models$KafkaConnectorState[KafkaConnectorState.PAUSED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$streamthoughts$jikkou$kafka$connect$models$KafkaConnectorState[KafkaConnectorState.STOPPED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$streamthoughts$jikkou$kafka$connect$models$KafkaConnectorState[KafkaConnectorState.RUNNING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$streamthoughts$jikkou$kafka$connect$models$KafkaConnectorState[KafkaConnectorState.UNASSIGNED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$streamthoughts$jikkou$kafka$connect$models$KafkaConnectorState[KafkaConnectorState.RESTARTING.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$streamthoughts$jikkou$kafka$connect$models$KafkaConnectorState[KafkaConnectorState.FAILED.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            $SwitchMap$io$streamthoughts$jikkou$core$reconciler$Operation = new int[Operation.values().length];
            try {
                $SwitchMap$io$streamthoughts$jikkou$core$reconciler$Operation[Operation.NONE.ordinal()] = 1;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$io$streamthoughts$jikkou$core$reconciler$Operation[Operation.UPDATE.ordinal()] = 2;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$io$streamthoughts$jikkou$core$reconciler$Operation[Operation.CREATE.ordinal()] = 3;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$io$streamthoughts$jikkou$core$reconciler$Operation[Operation.DELETE.ordinal()] = 4;
            } catch (NoSuchFieldError e10) {
            }
        }
    }

    public KafkaConnectorChangeHandler(@NotNull KafkaConnectApi kafkaConnectApi, @NotNull String str) {
        super(Set.of(Operation.CREATE, Operation.DELETE, Operation.UPDATE));
        this.api = (KafkaConnectApi) Objects.requireNonNull(kafkaConnectApi);
        this.cluster = str;
    }

    public List<ChangeResponse<ResourceChange>> handleChanges(@NotNull List<ResourceChange> list) {
        return list.stream().flatMap(this::handleChange).toList();
    }

    private Stream<ChangeResponse<ResourceChange>> handleChange(ResourceChange resourceChange) {
        switch (AnonymousClass1.$SwitchMap$io$streamthoughts$jikkou$core$reconciler$Operation[((ResourceChangeSpec) resourceChange.getSpec()).getOp().ordinal()]) {
            case 1:
                return Stream.empty();
            case 2:
                return updateConnector(resourceChange);
            case 3:
                return createOrUpdateConnectorConfig(resourceChange);
            case 4:
                return deleteConnector(resourceChange);
            default:
                throw new MatchException((String) null, (Throwable) null);
        }
    }

    @NotNull
    private Stream<ChangeResponse<ResourceChange>> updateConnector(ResourceChange resourceChange) {
        Optional empty;
        if (!isStateOnlyChange(resourceChange)) {
            return createOrUpdateConnectorConfig(resourceChange);
        }
        KafkaConnectorState kafkaConnectorState = (KafkaConnectorState) getState(resourceChange).getAfter();
        String name = resourceChange.getMetadata().getName();
        switch (kafkaConnectorState) {
            case PAUSED:
                empty = Optional.of(CompletableFuture.runAsync(() -> {
                    this.api.pauseConnector(name);
                }));
                break;
            case STOPPED:
                empty = Optional.of(CompletableFuture.runAsync(() -> {
                    this.api.stopConnector(name);
                }));
                break;
            case RUNNING:
                empty = Optional.of(CompletableFuture.runAsync(() -> {
                    this.api.resumeConnector(name);
                }));
                break;
            case UNASSIGNED:
            case RESTARTING:
            case FAILED:
                empty = Optional.empty();
                break;
            default:
                throw new MatchException((String) null, (Throwable) null);
        }
        return empty.map(completableFuture -> {
            return toChangeResponse(resourceChange, completableFuture);
        }).stream();
    }

    @NotNull
    private Stream<ChangeResponse<ResourceChange>> deleteConnector(ResourceChange resourceChange) {
        return Stream.of(toChangeResponse(resourceChange, CompletableFuture.runAsync(() -> {
            this.api.deleteConnector(resourceChange.getMetadata().getName());
        })));
    }

    @NotNull
    private Stream<ChangeResponse<ResourceChange>> createOrUpdateConnectorConfig(ResourceChange resourceChange) {
        return Stream.of(toChangeResponse(resourceChange, CompletableFuture.supplyAsync(() -> {
            return this.api.createOrUpdateConnector(resourceChange.getMetadata().getName(), buildConnectorConfig(resourceChange));
        })));
    }

    public TextDescription describe(@NotNull ResourceChange resourceChange) {
        return new KafkaConnectorChangeDescription(this.cluster, resourceChange);
    }

    @VisibleForTesting
    static boolean isStateOnlyChange(ResourceChange resourceChange) {
        return ((ResourceChangeSpec) resourceChange.getSpec()).getOp() == Operation.UPDATE && getConnectorClass(resourceChange).getOp() == Operation.NONE && getTasksMax(resourceChange).getOp() == Operation.NONE && Change.computeOperation(getConfig(resourceChange)) == Operation.NONE && getState(resourceChange).getOp() != Operation.NONE;
    }

    private Map<String, Object> buildConnectorConfig(ResourceChange resourceChange) {
        Map map = (Map) getConfig(resourceChange).stream().collect(Collectors.toMap((v0) -> {
            return v0.getName();
        }, (v0) -> {
            return v0.getAfter();
        }));
        HashMap hashMap = new HashMap();
        hashMap.put(KafkaConnectConstants.CONNECTOR_TASKS_MAX_CONFIG, getTasksMax(resourceChange).getAfter());
        hashMap.put(KafkaConnectConstants.CONNECTOR_CLASS_CONFIG, getConnectorClass(resourceChange).getAfter());
        hashMap.putAll(map);
        return hashMap;
    }

    private static List<StateChange> getConfig(ResourceChange resourceChange) {
        return ((ResourceChangeSpec) resourceChange.getSpec()).getChanges().allWithPrefix(KafkaConnectorChangeComputer.DATA_CONFIG_PREFIX).all();
    }

    private static SpecificStateChange<KafkaConnectorState> getState(ResourceChange resourceChange) {
        return ((ResourceChangeSpec) resourceChange.getSpec()).getChanges().getLast(KafkaConnectorChangeComputer.DATA_STATE, TypeConverter.of(KafkaConnectorState.class));
    }

    private static SpecificStateChange<String> getConnectorClass(ResourceChange resourceChange) {
        return ((ResourceChangeSpec) resourceChange.getSpec()).getChanges().getLast(KafkaConnectorChangeComputer.DATA_CONNECTOR_CLASS, TypeConverter.String());
    }

    private static SpecificStateChange<Integer> getTasksMax(ResourceChange resourceChange) {
        return ((ResourceChangeSpec) resourceChange.getSpec()).getChanges().getLast(KafkaConnectorChangeComputer.DATA_TASKS_MAX, TypeConverter.Integer());
    }

    private ChangeResponse<ResourceChange> toChangeResponse(ResourceChange resourceChange, CompletableFuture<?> completableFuture) {
        return new ChangeResponse<>(resourceChange, completableFuture.handle((obj, th) -> {
            if (th == null) {
                return ChangeMetadata.empty();
            }
            if (th.getCause() != null) {
                th = th.getCause();
            }
            if (!(th instanceof RestClientException)) {
                return ChangeMetadata.of(th);
            }
            ErrorResponse errorResponse = (ErrorResponse) ((RestClientException) th).getResponseEntity(ErrorResponse.class);
            return new ChangeMetadata(new ChangeError(errorResponse.message(), Integer.valueOf(errorResponse.errorCode())));
        }));
    }
}
