package io.fluxcapacitor.javaclient.eventsourcing.client;

import io.fluxcapacitor.common.Awaitable;
import io.fluxcapacitor.common.Backlog;
import io.fluxcapacitor.common.ObjectUtils;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.eventsourcing.AppendEvents;
import io.fluxcapacitor.common.api.eventsourcing.EventBatch;
import io.fluxcapacitor.common.api.eventsourcing.GetEvents;
import io.fluxcapacitor.javaclient.common.websocket.AbstractWebsocketClient;
import io.fluxcapacitor.javaclient.common.websocket.JsonDecoder;
import io.fluxcapacitor.javaclient.common.websocket.JsonEncoder;
import java.net.URI;
import java.util.List;
import java.util.stream.Stream;
import javax.websocket.ClientEndpoint;

@ClientEndpoint(encoders = {JsonEncoder.class}, decoders = {JsonDecoder.class})
/* loaded from: input_file:io/fluxcapacitor/javaclient/eventsourcing/client/WebSocketEventStoreClient.class */
public class WebSocketEventStoreClient extends AbstractWebsocketClient implements EventStoreClient {
    private final Backlog<EventBatch> backlog;
    private final int fetchBatchSize;

    public WebSocketEventStoreClient(String str) {
        this(URI.create(str), 1024, 1024);
    }

    public WebSocketEventStoreClient(String str, int i) {
        this(URI.create(str), i, 1024);
    }

    public WebSocketEventStoreClient(URI uri, int i, int i2) {
        super(uri);
        this.backlog = new Backlog<>(this::doSend, i);
        this.fetchBatchSize = i2;
    }

    @Override // io.fluxcapacitor.javaclient.eventsourcing.client.EventStoreClient
    public Awaitable storeEvents(String str, String str2, long j, List<SerializedMessage> list) {
        return this.backlog.add(new EventBatch[]{new EventBatch(str, str2, j, list)});
    }

    private Awaitable doSend(List<EventBatch> list) {
        sendRequest(new AppendEvents(list));
        return Awaitable.ready();
    }

    @Override // io.fluxcapacitor.javaclient.eventsourcing.client.EventStoreClient
    public Stream<SerializedMessage> getEvents(String str, long j) {
        return ObjectUtils.iterate(sendRequest(new GetEvents(str, Long.valueOf(j), this.fetchBatchSize)), getEventsResult -> {
            return sendRequest(new GetEvents(str, Long.valueOf(getEventsResult.getEventBatch().getLastSequenceNumber()), this.fetchBatchSize));
        }, getEventsResult2 -> {
            return getEventsResult2.getEventBatch().getEvents().size() < this.fetchBatchSize;
        }).flatMap(getEventsResult3 -> {
            return getEventsResult3.getEventBatch().getEvents().stream();
        });
    }
}
