package org.factcast.client.grpc;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.CallCredentials2;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCalls;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Generated;
import lombok.NonNull;
import net.devh.boot.grpc.client.security.CallCredentialsHelper;
import org.factcast.core.Fact;
import org.factcast.core.store.FactStore;
import org.factcast.core.store.RetryableException;
import org.factcast.core.store.StateToken;
import org.factcast.core.subscription.Subscription;
import org.factcast.core.subscription.SubscriptionImpl;
import org.factcast.core.subscription.SubscriptionRequestTO;
import org.factcast.core.subscription.observer.FactObserver;
import org.factcast.grpc.api.Capabilities;
import org.factcast.grpc.api.CompressionCodecs;
import org.factcast.grpc.api.ConditionalPublishRequest;
import org.factcast.grpc.api.StateForRequest;
import org.factcast.grpc.api.conv.ProtoConverter;
import org.factcast.grpc.api.conv.ProtocolVersion;
import org.factcast.grpc.api.conv.ServerConfig;
import org.factcast.grpc.api.gen.FactStoreProto;
import org.factcast.grpc.api.gen.RemoteFactStoreGrpc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

/* loaded from: input_file:BOOT-INF/lib/factcast-client-grpc-0.1.4.1.jar:org/factcast/client/grpc/GrpcFactStore.class */
public class GrpcFactStore implements FactStore, SmartInitializingSingleton {
    private final CompressionCodecs codecs;
    private static final String CHANNEL_NAME = "factstore";
    private RemoteFactStoreGrpc.RemoteFactStoreBlockingStub blockingStub;
    private RemoteFactStoreGrpc.RemoteFactStoreStub stub;
    private final ProtoConverter converter;
    private final AtomicBoolean initialized;

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) GrpcFactStore.class);
    private static final ProtocolVersion PROTOCOL_VERSION = ProtocolVersion.of(1, 1, 0);

    @Autowired
    @Generated
    public GrpcFactStore(FactCastGrpcChannelFactory factCastGrpcChannelFactory, @Value("${grpc.client.factstore.credentials:#{null}}") Optional<String> optional) {
        this(factCastGrpcChannelFactory.createChannel(CHANNEL_NAME), optional);
    }

    @VisibleForTesting
    @Generated
    GrpcFactStore(Channel channel, Optional<String> optional) {
        this(RemoteFactStoreGrpc.newBlockingStub(channel), RemoteFactStoreGrpc.newStub(channel), optional);
    }

    private GrpcFactStore(RemoteFactStoreGrpc.RemoteFactStoreBlockingStub remoteFactStoreBlockingStub, RemoteFactStoreGrpc.RemoteFactStoreStub remoteFactStoreStub, Optional<String> optional) {
        this.codecs = new CompressionCodecs();
        this.converter = new ProtoConverter();
        this.initialized = new AtomicBoolean(false);
        this.blockingStub = remoteFactStoreBlockingStub;
        this.stub = remoteFactStoreStub;
        if (optional.isPresent()) {
            String[] split = optional.get().split(":");
            if (split.length != 2) {
                throw new IllegalArgumentException("Credentials in 'grpc.client.factstore.credentials' have to be defined as 'username:password'");
            }
            CallCredentials2 basicAuth = CallCredentialsHelper.basicAuth(split[0], split[1]);
            this.blockingStub = this.blockingStub.withCallCredentials(basicAuth);
            this.stub = this.stub.withCallCredentials(basicAuth);
        }
    }

    @Override // org.factcast.core.store.FactStore
    public Optional<Fact> fetchById(UUID uuid) {
        log.trace("fetching {} from remote store", uuid);
        try {
            FactStoreProto.MSG_OptionalFact fetchById = this.blockingStub.fetchById(this.converter.toProto(uuid));
            return !fetchById.getPresent() ? Optional.empty() : this.converter.fromProto(fetchById);
        } catch (StatusRuntimeException e) {
            throw wrapRetryable(e);
        }
    }

    @Override // org.factcast.core.store.FactStore
    public void publish(@NonNull List<? extends Fact> list) {
        if (list == null) {
            throw new NullPointerException("factsToPublish is marked non-null but is null");
        }
        log.trace("publishing {} facts to remote store", Integer.valueOf(list.size()));
        Stream<? extends Fact> stream = list.stream();
        ProtoConverter protoConverter = this.converter;
        protoConverter.getClass();
        try {
            this.blockingStub.publish(FactStoreProto.MSG_Facts.newBuilder().addAllFact((List) stream.map(protoConverter::toProto).collect(Collectors.toList())).build());
        } catch (StatusRuntimeException e) {
            throw wrapRetryable(e);
        }
    }

    @Override // org.factcast.core.store.FactStore
    public Subscription subscribe(@NonNull SubscriptionRequestTO subscriptionRequestTO, @NonNull FactObserver factObserver) {
        if (subscriptionRequestTO == null) {
            throw new NullPointerException("req is marked non-null but is null");
        }
        if (factObserver == null) {
            throw new NullPointerException("observer is marked non-null but is null");
        }
        SubscriptionImpl on = SubscriptionImpl.on(factObserver);
        ClientStreamObserver clientStreamObserver = new ClientStreamObserver(on);
        ClientCall newCall = this.stub.getChannel().newCall(RemoteFactStoreGrpc.getSubscribeMethod(), this.stub.getCallOptions().withWaitForReady());
        try {
            ClientCalls.asyncServerStreamingCall(newCall, this.converter.toProto(subscriptionRequestTO), clientStreamObserver);
            return on.onClose(() -> {
                cancel(newCall);
            });
        } catch (StatusRuntimeException e) {
            throw wrapRetryable(e);
        }
    }

    @VisibleForTesting
    void cancel(ClientCall<FactStoreProto.MSG_SubscriptionRequest, FactStoreProto.MSG_Notification> clientCall) {
        clientCall.cancel("Client is no longer interested", null);
    }

    @Override // org.factcast.core.store.FactStore
    public OptionalLong serialOf(@NonNull UUID uuid) {
        if (uuid == null) {
            throw new NullPointerException("l is marked non-null but is null");
        }
        try {
            return this.converter.fromProto(this.blockingStub.serialOf(this.converter.toProto(uuid)));
        } catch (StatusRuntimeException e) {
            throw wrapRetryable(e);
        }
    }

    public synchronized void initialize() {
        if (this.initialized.getAndSet(true)) {
            return;
        }
        log.debug("Invoking handshake");
        try {
            ServerConfig fromProto = this.converter.fromProto(this.blockingStub.handshake(this.converter.empty()));
            ProtocolVersion version = fromProto.version();
            Map<String, String> properties = fromProto.properties();
            logProtocolVersion(version);
            logServerVersion(properties);
            configureCompression(properties.get(Capabilities.CODECS.toString()));
        } catch (StatusRuntimeException e) {
            throw wrapRetryable(e);
        }
    }

    private static void logServerVersion(Map<String, String> map) {
        log.info("Server reported implementation version {}", map.get(Capabilities.FACTCAST_IMPL_VERSION.toString()));
    }

    private static void logProtocolVersion(ProtocolVersion protocolVersion) {
        if (!PROTOCOL_VERSION.isCompatibleTo(protocolVersion)) {
            throw new IncompatibleProtocolVersions("Apparently, the local Protocol Version " + PROTOCOL_VERSION + " is not compatible with the Server's " + protocolVersion + ". \nPlease choose a compatible GRPC Client to connect to this Server.");
        }
        if (PROTOCOL_VERSION.equals(protocolVersion)) {
            log.info("Matching protocol version encountered {}", protocolVersion);
        } else {
            log.info("Compatible protocol version encountered client={}, server={}", PROTOCOL_VERSION, protocolVersion);
        }
    }

    @VisibleForTesting
    void configureCompression(String str) {
        this.codecs.selectFrom(str).ifPresent(str2 -> {
            log.info("configuring Codec " + str2);
            this.blockingStub = this.blockingStub.withCompression(str2);
            this.stub = this.stub.withCompression(str2);
        });
    }

    @Override // org.springframework.beans.factory.SmartInitializingSingleton
    public synchronized void afterSingletonsInstantiated() {
        initialize();
    }

    @Override // org.factcast.core.store.FactStore
    public Set<String> enumerateNamespaces() {
        try {
            return this.converter.fromProto(this.blockingStub.enumerateNamespaces(this.converter.empty()));
        } catch (StatusRuntimeException e) {
            throw wrapRetryable(e);
        }
    }

    @Override // org.factcast.core.store.FactStore
    public Set<String> enumerateTypes(String str) {
        try {
            return this.converter.fromProto(this.blockingStub.enumerateTypes(this.converter.toProto(str)));
        } catch (StatusRuntimeException e) {
            throw wrapRetryable(e);
        }
    }

    @VisibleForTesting
    static RuntimeException wrapRetryable(StatusRuntimeException statusRuntimeException) {
        return statusRuntimeException.getStatus().getCode() == Status.Code.UNAVAILABLE ? new RetryableException(statusRuntimeException) : statusRuntimeException;
    }

    @Override // org.factcast.core.store.FactStore
    public boolean publishIfUnchanged(@NonNull List<? extends Fact> list, @NonNull Optional<StateToken> optional) {
        if (list == null) {
            throw new NullPointerException("factsToPublish is marked non-null but is null");
        }
        if (optional == null) {
            throw new NullPointerException("token is marked non-null but is null");
        }
        try {
            return this.blockingStub.publishConditional(this.converter.toProto(new ConditionalPublishRequest(list, (UUID) optional.map((v0) -> {
                return v0.uuid();
            }).orElse(null)))).getSuccess();
        } catch (StatusRuntimeException e) {
            throw wrapRetryable(e);
        }
    }

    @Override // org.factcast.core.store.FactStore
    public void invalidate(@NonNull StateToken stateToken) {
        if (stateToken == null) {
            throw new NullPointerException("token is marked non-null but is null");
        }
        try {
            this.blockingStub.invalidate(this.converter.toProto(stateToken.uuid()));
        } catch (StatusRuntimeException e) {
            throw wrapRetryable(e);
        }
    }

    @Override // org.factcast.core.store.FactStore
    public StateToken stateFor(@NonNull Collection<UUID> collection, @NonNull Optional<String> optional) {
        if (collection == null) {
            throw new NullPointerException("forAggIds is marked non-null but is null");
        }
        if (optional == null) {
            throw new NullPointerException("ns is marked non-null but is null");
        }
        try {
            return new StateToken(this.converter.fromProto(this.blockingStub.stateFor(this.converter.toProto(new StateForRequest(Lists.newArrayList(collection), optional.orElse(null))))));
        } catch (StatusRuntimeException e) {
            throw wrapRetryable(e);
        }
    }

    @Override // org.factcast.core.store.FactStore
    public long currentTime() {
        try {
            return this.converter.fromProto(this.blockingStub.currentTime(this.converter.empty()));
        } catch (StatusRuntimeException e) {
            throw wrapRetryable(e);
        }
    }
}
