package io.contract_testing.contractcase.internal.client.rpc;

import com.google.gson.Gson;
import com.google.gson.stream.JsonReader;
import com.google.protobuf.AbstractMessage;
import com.google.protobuf.GeneratedMessageV3;
import com.google.protobuf.GeneratedMessageV3.Builder;
import com.google.protobuf.StringValue;
import io.contract_testing.contractcase.configuration.LogLevel;
import io.contract_testing.contractcase.exceptions.ContractCaseConfigurationError;
import io.contract_testing.contractcase.exceptions.ContractCaseCoreError;
import io.contract_testing.contractcase.grpc.ContractCaseGrpc;
import io.contract_testing.contractcase.grpc.ContractCaseStream;
import io.contract_testing.contractcase.internal.client.MaintainerLog;
import io.contract_testing.contractcase.internal.client.server.ContractCaseProcess;
import io.contract_testing.contractcase.internal.edge.ConnectorExceptionMapper;
import io.contract_testing.contractcase.internal.edge.ConnectorFailure;
import io.contract_testing.contractcase.internal.edge.ConnectorFailureKindConstants;
import io.contract_testing.contractcase.internal.edge.ConnectorInvokableFunctionMapper;
import io.contract_testing.contractcase.internal.edge.ConnectorResult;
import io.contract_testing.contractcase.internal.edge.RunTestCallback;
import io.contract_testing.contractcase.logs.LogPrinter;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.jetbrains.annotations.NotNull;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/contract_testing/contractcase/internal/client/rpc/AbstractRpcConnector.class */
public abstract class AbstractRpcConnector<T extends AbstractMessage, B extends GeneratedMessageV3.Builder<B>> {
    private final SendingWorker<T> worker;
    private static final Semaphore sendMutex = new Semaphore(1);
    private static final int DEFAULT_TIMEOUT_SECONDS = 60;
    private volatile ContractCaseStream.BoundaryResult failedResult;
    private final Map<String, ConnectorInvokableFunctionMapper.ConnectorInvokableFunction> registeredFunctions = new ConcurrentHashMap();
    private final ResponseWaiter responseWaiter = new ResponseWaiter();
    protected final CountDownLatch finishLatch = new CountDownLatch(1);
    private final ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", ContractCaseProcess.getInstance().getPortNumber()).defaultServiceConfig(readServiceConfig()).usePlaintext().enableRetry().build();

    public AbstractRpcConnector(@NotNull LogPrinter logPrinter, @NotNull ConfigHandle configHandle, @NotNull RunTestCallback runTestCallback) {
        this.worker = SendingWorker.create(createConnection(ContractCaseGrpc.newStub(this.channel), new ContractResponseStreamObserver<>(this, logPrinter, configHandle, runTestCallback)));
    }

    protected Map<String, ?> readServiceConfig() {
        InputStream resourceAsStream = AbstractRpcConnector.class.getResourceAsStream("service_config.json");
        if (resourceAsStream == null) {
            throw new ContractCaseCoreError("Unable to read the service config resource. This indicates that the ContractCase jar was built incorrectly.");
        }
        return (Map) new Gson().fromJson(new JsonReader(new InputStreamReader(resourceAsStream, StandardCharsets.UTF_8)), Map.class);
    }

    abstract StreamObserver<T> createConnection(ContractCaseGrpc.ContractCaseStub contractCaseStub, ContractResponseStreamObserver<T, B> contractResponseStreamObserver);

    abstract T setId(B b, StringValue stringValue);

    abstract B makeResponse(ContractCaseStream.ResultResponse resultResponse);

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract B makeInvokeTest(StringValue stringValue);

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract B makeInvokeFunction(String str, List<String> list);

    public ConnectorResult executeCallAndWait(B b, String str) {
        return executeCallAndWait(b, str, DEFAULT_TIMEOUT_SECONDS);
    }

    public ConnectorResult executeCallAndWait(B b, String str, int i) {
        if (this.failedResult != null) {
            return ConnectorIncomingMapper.mapBoundaryResult(this.failedResult);
        }
        String createAwait = this.responseWaiter.createAwait(str);
        this.worker.send(setId(b, ConnectorOutgoingMapper.map(createAwait)), LogLevel.MAINTAINER_DEBUG);
        return this.responseWaiter.awaitResponse(createAwait, i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cancelAll(ContractCaseCoreError contractCaseCoreError) {
        ContractCaseStream.BoundaryResult mapResult = ConnectorOutgoingMapper.mapResult(ConnectorExceptionMapper.map(contractCaseCoreError));
        this.failedResult = mapResult;
        this.responseWaiter.cancelAll(mapResult);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void completeWait(String str, ContractCaseStream.BoundaryResult boundaryResult) {
        this.responseWaiter.completeAwait(str, boundaryResult);
    }

    void sendResponse(B b, String str, LogLevel logLevel) {
        try {
            sendMutex.acquire();
            try {
                this.worker.send(setId(b, ConnectorOutgoingMapper.map(str)), logLevel);
                sendMutex.release();
            } catch (Throwable th) {
                sendMutex.release();
                throw th;
            }
        } catch (InterruptedException e) {
            throw new ContractCaseCoreError("Interrupted while waiting to aquire the send mutex.\nIf this happened without you killing the test run, then there may be a threading bug in the ContractCase java DSL.");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendResponse(ContractCaseStream.ResultResponse resultResponse, String str, LogLevel logLevel) {
        sendResponse((AbstractRpcConnector<T, B>) makeResponse(resultResponse), str, logLevel);
    }

    public void close() {
        this.worker.close();
        try {
            this.finishLatch.await(5L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        try {
            this.channel.shutdown().awaitTermination(5L, TimeUnit.SECONDS);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
        }
    }

    public <R> void registerFunction(String str, ConnectorInvokableFunctionMapper.ConnectorInvokableFunction connectorInvokableFunction) {
        if (this.registeredFunctions.containsKey(str)) {
            throw new ContractCaseConfigurationError("The function '' was already registered. Make sure you are only registering it once.", "UNDOCUMENTED");
        }
        this.registeredFunctions.put(str, connectorInvokableFunction);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConnectorResult invokeFunction(String str, List<String> list) {
        ConnectorInvokableFunctionMapper.ConnectorInvokableFunction connectorInvokableFunction = this.registeredFunctions.get(str);
        return connectorInvokableFunction == null ? new ConnectorFailure(ConnectorFailureKindConstants.CASE_CORE_ERROR, "The core asked us to invoke the function '" + str + "' but it didn't exist in our store", MaintainerLog.CONTRACT_CASE_JAVA_WRAPPER, str, "") : connectorInvokableFunction.apply(list);
    }
}
