package io.fluxcapacitor.javaclient.persisting.eventsourcing.client;

import io.fluxcapacitor.common.Awaitable;
import io.fluxcapacitor.common.Backlog;
import io.fluxcapacitor.common.ObjectUtils;
import io.fluxcapacitor.common.api.BooleanResult;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.eventsourcing.AppendEvents;
import io.fluxcapacitor.common.api.eventsourcing.DeleteEvents;
import io.fluxcapacitor.common.api.eventsourcing.EventBatch;
import io.fluxcapacitor.common.api.eventsourcing.GetEvents;
import io.fluxcapacitor.common.api.eventsourcing.GetEventsResult;
import io.fluxcapacitor.javaclient.common.websocket.AbstractWebsocketClient;
import io.fluxcapacitor.javaclient.configuration.client.WebSocketClient;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.AggregateEventStream;
import java.net.URI;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import javax.websocket.ClientEndpoint;

@ClientEndpoint
/* loaded from: input_file:io/fluxcapacitor/javaclient/persisting/eventsourcing/client/WebSocketEventStoreClient.class */
public class WebSocketEventStoreClient extends AbstractWebsocketClient implements EventStoreClient {
    private final Backlog<EventBatch> backlog;
    private final int fetchBatchSize;

    public WebSocketEventStoreClient(String str, WebSocketClient.Properties properties) {
        this(URI.create(str), 1024, 8192, properties);
    }

    public WebSocketEventStoreClient(String str, int i, WebSocketClient.Properties properties) {
        this(URI.create(str), i, 1024, properties);
    }

    public WebSocketEventStoreClient(URI uri, int i, int i2, WebSocketClient.Properties properties) {
        super(uri, properties);
        this.backlog = new Backlog<>(this::doSend, i);
        this.fetchBatchSize = i2;
    }

    @Override // io.fluxcapacitor.javaclient.persisting.eventsourcing.client.EventStoreClient
    public Awaitable storeEvents(String str, String str2, long j, List<SerializedMessage> list) {
        return this.backlog.add(new EventBatch[]{new EventBatch(str, str2, j, list)});
    }

    private Awaitable doSend(List<EventBatch> list) {
        sendRequestAndWait(new AppendEvents(list));
        return Awaitable.ready();
    }

    @Override // io.fluxcapacitor.javaclient.persisting.eventsourcing.client.EventStoreClient
    public AggregateEventStream<SerializedMessage> getEvents(String str, long j) {
        AtomicReference atomicReference = new AtomicReference();
        GetEventsResult sendRequestAndWait = sendRequestAndWait(new GetEvents(str, Long.valueOf(j), this.fetchBatchSize));
        Stream flatMap = ObjectUtils.iterate(sendRequestAndWait, getEventsResult -> {
            return sendRequestAndWait(new GetEvents(str, Long.valueOf(getEventsResult.getEventBatch().getLastSequenceNumber()), this.fetchBatchSize));
        }, getEventsResult2 -> {
            return getEventsResult2.getEventBatch().getEvents().size() < this.fetchBatchSize;
        }).flatMap(getEventsResult3 -> {
            if (!getEventsResult3.getEventBatch().isEmpty()) {
                atomicReference.set(Long.valueOf(getEventsResult3.getEventBatch().getLastSequenceNumber()));
            }
            return getEventsResult3.getEventBatch().getEvents().stream();
        });
        String domain = sendRequestAndWait.getEventBatch().getDomain();
        Objects.requireNonNull(atomicReference);
        return new AggregateEventStream<>(flatMap, str, domain, atomicReference::get);
    }

    @Override // io.fluxcapacitor.javaclient.persisting.eventsourcing.client.EventStoreClient
    public CompletableFuture<Boolean> deleteEvents(String str) {
        return sendRequest(new DeleteEvents(str)).thenApply(queryResult -> {
            return Boolean.valueOf(((BooleanResult) queryResult).isSuccess());
        });
    }
}
