package io.scalecube.services;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import io.scalecube.cluster.membership.IdGenerator;
import io.scalecube.services.metrics.Metrics;
import io.scalecube.services.routing.Router;
import io.scalecube.transport.Message;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;

/* loaded from: input_file:io/scalecube/services/ServiceCall.class */
public class ServiceCall {
    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceProxyFactory.class);
    private Duration timeout;
    private Router router;
    private Timer latency;
    private Metrics metrics;

    public ServiceCall(Router router, Duration duration, Metrics metrics) {
        this.router = router;
        this.timeout = duration;
        this.metrics = metrics;
        this.latency = Metrics.timer(this.metrics, ServiceCall.class.getName(), "invoke");
    }

    public ServiceCall(Router router, Duration duration) {
        this(router, duration, null);
    }

    public CompletableFuture<Message> invoke(Message message) {
        return invoke(message, this.timeout);
    }

    public CompletableFuture<Message> invoke(Message message, Duration duration) {
        Messages.validate().serviceRequest(message);
        Optional<ServiceInstance> route = this.router.route(message);
        if (route.isPresent()) {
            return invoke(message, route.get(), duration);
        }
        throw noReachableMemberException(message);
    }

    public CompletableFuture<Message> invoke(Message message, ServiceInstance serviceInstance) throws Exception {
        Messages.validate().serviceRequest(message);
        return invoke(message, serviceInstance, this.timeout);
    }

    public CompletableFuture<Message> invoke(Message message, ServiceInstance serviceInstance, Duration duration) {
        Objects.requireNonNull(serviceInstance);
        Messages.validate().serviceRequest(message);
        serviceInstance.checkMethodExists(message.header(ServiceHeaders.METHOD));
        if (serviceInstance.isLocal().booleanValue()) {
            return serviceInstance.invoke(message);
        }
        String generateId = IdGenerator.generateId();
        ServiceResponse correlationId = ServiceResponse.correlationId(generateId);
        Timer.Context time = Metrics.time(this.latency);
        Metrics.mark(this.metrics, ServiceCall.class.getName(), "invoke", "request");
        Counter counter = Metrics.counter(this.metrics, ServiceCall.class.getName(), "invoke-pending");
        Metrics.inc(counter);
        serviceInstance.invoke(Messages.asRequest(message, generateId)).whenComplete((message2, th) -> {
            Metrics.dec(counter);
            Metrics.stop(time);
            if (th == null) {
                Metrics.mark(this.metrics, ServiceCall.class, "invoke", "response");
                correlationId.withTimeout(duration);
            } else {
                Metrics.mark(this.metrics, ServiceCall.class.getName(), "invoke", "error");
                correlationId.completeExceptionally(th);
            }
        });
        return correlationId.future();
    }

    public Observable<Message> invokeAll(Message message) {
        return invokeAll(message, this.timeout);
    }

    public Observable<Message> invokeAll(Message message, Duration duration) {
        SerializedSubject serialized = PublishSubject.create().toSerialized();
        this.router.routes(message).forEach(serviceInstance -> {
            invoke(message, duration).whenComplete((message2, th) -> {
                if (message2 != null) {
                    serialized.onNext(message2);
                } else {
                    serialized.onNext(Messages.asError(th, message.correlationId(), serviceInstance.memberId()));
                }
            });
        });
        return serialized.onBackpressureBuffer().asObservable();
    }

    public Observable<Message> listen(Message message) {
        Messages.validate().serviceRequest(message);
        Optional<ServiceInstance> route = this.router.route(message);
        if (!route.isPresent()) {
            throw noReachableMemberException(message);
        }
        ServiceInstance serviceInstance = route.get();
        Preconditions.checkArgument(serviceInstance.methodExists(message.header(ServiceHeaders.METHOD)), "instance has no such requested method");
        return serviceInstance.listen(message);
    }

    private IllegalStateException noReachableMemberException(Message message) {
        String header = message.header(ServiceHeaders.SERVICE_REQUEST);
        String header2 = message.header(ServiceHeaders.METHOD);
        LOGGER.error("Failed  to invoke service, No reachable member with such service definition [{}], args [{}]", header, message);
        return new IllegalStateException("No reachable member with such service: " + header2);
    }
}
