package io.es4j.infra.redis;

import com.google.auto.service.AutoService;
import io.es4j.Aggregate;
import io.es4j.Es4jDeployment;
import io.es4j.core.CommandHandler;
import io.es4j.core.objects.ErrorSource;
import io.es4j.core.objects.Es4jErrorBuilder;
import io.es4j.infrastructure.EventStore;
import io.es4j.infrastructure.models.AggregateEventStream;
import io.es4j.infrastructure.models.AppendInstruction;
import io.es4j.infrastructure.models.Event;
import io.es4j.infrastructure.models.EventStream;
import io.es4j.infrastructure.models.PruneEventStream;
import io.es4j.infrastructure.models.StartStream;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.core.impl.cpu.CpuCoreSensor;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.redis.client.Redis;
import io.vertx.mutiny.redis.client.RedisAPI;
import io.vertx.mutiny.redis.client.Response;
import io.vertx.redis.client.RedisOptions;
import io.vertx.redis.client.ResponseType;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoService({EventStore.class})
/* loaded from: input_file:io/es4j/infra/redis/RedisEventStore.class */
public class RedisEventStore implements EventStore {
    public static final String TENANT_ID = "tenant-id";
    public static final String EVENT_CLASS = "event-class";
    public static final String COMMAND_ID = "command-id";
    public static final String EVENT = "event";
    public static final String TAGS = "tags";
    public static final String SCHEMA_VERSION = "schema-version";
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisEventStore.class);
    private Redis redisClient;
    private RedisAPI redisApi;
    private Class<? extends Aggregate> aggregateClass;

    public <T extends Aggregate> Uni<List<Event>> fetch(AggregateEventStream<T> aggregateEventStream) {
        return this.redisApi.xread(parseStreamArguments(aggregateEventStream)).map(response -> {
            LOGGER.debug("Reading from stream -> {}", response);
            checkResponse(response);
            return response.attributes().values().stream().map(this::mapAttributes).toList();
        });
    }

    public <T extends Aggregate> Uni<Void> stream(AggregateEventStream<T> aggregateEventStream, Consumer<Event> consumer) {
        return this.redisApi.xread(parseStreamArguments(aggregateEventStream)).map(RedisEventStore::checkResponse).map(response -> {
            checkResponse(response);
            response.attributes().values().stream().map(this::mapAttributes).forEach(consumer);
            return response;
        }).replaceWithVoid();
    }

    public <T extends Aggregate> Uni<Void> append(AppendInstruction<T> appendInstruction) {
        String aggregateStream = aggregateStream(appendInstruction);
        return Multi.createFrom().iterable(appendInstruction.events()).onItem().transformToUniAndConcatenate(event -> {
            return this.redisApi.xadd(mapArgs(event, aggregateStream));
        }).collect().asList().replaceWithVoid();
    }

    public <T extends Aggregate> Uni<Void> startStream(StartStream<T> startStream) {
        return this.redisApi.sadd(List.of(this.aggregateClass.getSimpleName(), aggregateStream(startStream))).map(RedisEventStore::checkResponse).replaceWithVoid();
    }

    private static Response checkResponse(Response response) {
        if (response.type() == ResponseType.ERROR) {
            throw new RedisEventStoreException(response.toBuffer().toString());
        }
        return response;
    }

    private List<String> mapArgs(Event event, String str) {
        return Arrays.asList(str, String.valueOf(event.eventVersion()), TENANT_ID, event.tenantId(), EVENT_CLASS, event.eventType(), COMMAND_ID, event.commandId(), EVENT, event.event().encode(), TENANT_ID, event.tenantId(), TAGS, new JsonArray(event.tags()).encode(), SCHEMA_VERSION, event.schemaVersion().toString());
    }

    private <T extends Aggregate> String aggregateStream(StartStream<T> startStream) {
        return this.aggregateClass.getSimpleName() + "-" + startStream.aggregateId() + "-" + startStream.tenantId();
    }

    private <T extends Aggregate> String aggregateStream(AppendInstruction<T> appendInstruction) {
        return this.aggregateClass.getSimpleName() + "-" + appendInstruction.aggregateId() + "-" + appendInstruction.tenantId();
    }

    private <T extends Aggregate> String aggregateStream(PruneEventStream<T> pruneEventStream) {
        return this.aggregateClass.getSimpleName() + "-" + pruneEventStream.aggregateId() + "-" + pruneEventStream.tenantId();
    }

    private <T extends Aggregate> String aggregateStream(AggregateEventStream<T> aggregateEventStream) {
        return this.aggregateClass.getSimpleName() + "-" + aggregateEventStream.aggregateId() + "-" + aggregateEventStream.tenantId();
    }

    private <T extends Aggregate> List<String> parseStreamArguments(AggregateEventStream<T> aggregateEventStream) {
        return Arrays.asList(aggregateStream(aggregateEventStream), String.valueOf(aggregateEventStream.eventVersionOffset()));
    }

    public Uni<List<Event>> fetch(EventStream eventStream) {
        throw new RedisEventStoreException(Es4jErrorBuilder.builder().hint("not supported").errorSource(ErrorSource.INFRASTRUCTURE).cause("Redis error").build());
    }

    public Uni<Void> stream(EventStream eventStream, Consumer<Event> consumer) {
        throw new RedisEventStoreException(Es4jErrorBuilder.builder().hint("not supported").errorSource(ErrorSource.INFRASTRUCTURE).cause("Redis error").build());
    }

    private Event mapAttributes(Response response) {
        return (Event) response.toBuffer().toJsonObject().mapTo(Event.class);
    }

    public Uni<Void> stop() {
        this.redisApi.close();
        return Uni.createFrom().voidItem();
    }

    public void start(Es4jDeployment es4jDeployment, Vertx vertx, JsonObject jsonObject) {
        this.aggregateClass = es4jDeployment.aggregateClass();
        this.redisClient = Redis.createClient(vertx, new RedisOptions().setMaxPoolSize(CpuCoreSensor.availableProcessors()).setMaxWaitingHandlers(CpuCoreSensor.availableProcessors() * 4).setPassword(jsonObject.getString("redisPassword")).setPoolName(CommandHandler.camelToKebab(es4jDeployment.aggregateClass().getSimpleName())).setConnectionString("redis://:%s@%s:%s/%s".formatted(jsonObject.getString("redisPassword"), jsonObject.getString("redisHost"), jsonObject.getString("redisPort"), jsonObject.getString("redisDb"))));
        this.redisApi = RedisAPI.api(this.redisClient);
    }

    public Uni<Void> setup(Es4jDeployment es4jDeployment, Vertx vertx, JsonObject jsonObject) {
        return null;
    }

    public <T extends Aggregate> Uni<Void> trim(PruneEventStream<T> pruneEventStream) {
        return this.redisApi.xtrim(List.of(aggregateStream(pruneEventStream), "MINID", pruneEventStream.offsetTo().toString())).map(RedisEventStore::checkResponse).replaceWithVoid();
    }
}
