package alluxio.master.journal.raft.transport;

import alluxio.grpc.CopycatMessage;
import alluxio.grpc.CopycatRequestHeader;
import alluxio.grpc.CopycatResponseHeader;
import alluxio.resource.LockResource;
import com.google.common.base.MoreObjects;
import com.google.protobuf.UnsafeByteOperations;
import io.atomix.catalyst.concurrent.Listener;
import io.atomix.catalyst.concurrent.Listeners;
import io.atomix.catalyst.concurrent.Scheduled;
import io.atomix.catalyst.concurrent.ThreadContext;
import io.atomix.catalyst.serializer.SerializationException;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.util.Assert;
import io.grpc.stub.StreamObserver;
import java.net.ConnectException;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:alluxio/master/journal/raft/transport/CopycatGrpcConnection.class */
public abstract class CopycatGrpcConnection implements Connection, StreamObserver<CopycatMessage> {
    private static final Logger LOG = LoggerFactory.getLogger(CopycatGrpcConnection.class);
    private static AtomicLong sConnectionIdCounter = new AtomicLong(0);
    private Throwable mLastFailure;
    private final ThreadContext mContext;
    private final ConnectionOwner mConnectionOwner;
    private final String mConnectionId;
    private StreamObserver<CopycatMessage> mTargetObserver;
    private final long mRequestTimeoutMs;
    private final Scheduled mTimeoutScheduler;
    private final ExecutorService mExecutor;
    private final ReadWriteLock mStateLock = new ReentrantReadWriteLock();
    private boolean mClosed = false;
    private boolean mStreamCompleted = false;
    private final AtomicLong mRequestCounter = new AtomicLong(0);
    private final Listeners<Throwable> mExceptionListeners = new Listeners<>();
    private final Listeners<Connection> mCloseListeners = new Listeners<>();
    private final Map<Class, HandlerHolder> mHandlers = new ConcurrentHashMap();
    private final Map<Long, ContextualFuture> mResponseFutures = new ConcurrentSkipListMap();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:alluxio/master/journal/raft/transport/CopycatGrpcConnection$ConnectionOwner.class */
    public enum ConnectionOwner {
        CLIENT,
        SERVER
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:alluxio/master/journal/raft/transport/CopycatGrpcConnection$ContextualFuture.class */
    public static class ContextualFuture<T> extends CompletableFuture<T> {
        private final long mCreationTime;
        private final ThreadContext mContext;

        private ContextualFuture(long j, ThreadContext threadContext) {
            this.mCreationTime = j;
            this.mContext = threadContext;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ThreadContext getContext() {
            return this.mContext;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long getCreationTime() {
            return this.mCreationTime;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:alluxio/master/journal/raft/transport/CopycatGrpcConnection$HandlerHolder.class */
    public static class HandlerHolder {
        private final Function<Object, CompletableFuture<Object>> mHandler;
        private final ThreadContext mContext;

        private HandlerHolder(Function function, ThreadContext threadContext) {
            this.mHandler = function;
            this.mContext = threadContext;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ThreadContext getContext() {
            return this.mContext;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Function<Object, CompletableFuture<Object>> getHandler() {
            return this.mHandler;
        }
    }

    public CopycatGrpcConnection(ConnectionOwner connectionOwner, String str, ThreadContext threadContext, ExecutorService executorService, long j) {
        this.mConnectionOwner = connectionOwner;
        this.mConnectionId = MoreObjects.toStringHelper(this).add("ConnectionOwner", this.mConnectionOwner).add("ConnectionId", sConnectionIdCounter.incrementAndGet()).add("TransportId", str).toString();
        this.mContext = threadContext;
        this.mExecutor = executorService;
        this.mRequestTimeoutMs = j;
        this.mTimeoutScheduler = threadContext.schedule(Duration.ofMillis(this.mRequestTimeoutMs), Duration.ofMillis(this.mRequestTimeoutMs / 2), this::timeoutPendingRequests);
    }

    public void setTargetObserver(StreamObserver<CopycatMessage> streamObserver) {
        this.mTargetObserver = streamObserver;
    }

    public CompletableFuture<Void> send(Object obj) {
        return sendAndReceiveInternal(obj, true);
    }

    public <T, U> CompletableFuture<U> sendAndReceive(T t) {
        return sendAndReceiveInternal(t, false);
    }

    private <T, U> CompletableFuture<U> sendAndReceiveInternal(T t, boolean z) {
        LockResource lockResource = new LockResource(this.mStateLock.readLock());
        Throwable th = null;
        try {
            Assert.notNull(t, "request");
            ContextualFuture contextualFuture = new ContextualFuture(System.currentTimeMillis(), ThreadContext.currentContextOrThrow());
            if (this.mClosed) {
                contextualFuture.completeExceptionally(new IllegalStateException("Connection closed"));
                if (lockResource != null) {
                    if (0 != 0) {
                        try {
                            lockResource.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        lockResource.close();
                    }
                }
                return contextualFuture;
            }
            long incrementAndGet = this.mRequestCounter.incrementAndGet();
            this.mResponseFutures.put(Long.valueOf(incrementAndGet), contextualFuture);
            try {
                this.mTargetObserver.onNext(CopycatMessage.newBuilder().setRequestHeader(CopycatRequestHeader.newBuilder().setRequestId(incrementAndGet)).setMessage(UnsafeByteOperations.unsafeWrap(contextualFuture.getContext().serializer().writeObject(t).array())).build());
                if (z) {
                    contextualFuture.complete(null);
                }
                LOG.debug("Submitted request({}) of type: {}. Connection: {} FireAndForget: {}", new Object[]{Long.valueOf(incrementAndGet), t.getClass().getName(), this.mConnectionId, Boolean.valueOf(z)});
                if (lockResource != null) {
                    if (0 != 0) {
                        try {
                            lockResource.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        lockResource.close();
                    }
                }
                return contextualFuture;
            } catch (Exception e) {
                contextualFuture.completeExceptionally(e);
                if (lockResource != null) {
                    if (0 != 0) {
                        try {
                            lockResource.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        lockResource.close();
                    }
                }
                return contextualFuture;
            }
        } catch (Throwable th5) {
            if (lockResource != null) {
                if (0 != 0) {
                    try {
                        lockResource.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    lockResource.close();
                }
            }
            throw th5;
        }
    }

    public <T, U> Connection handler(Class<T> cls, Consumer<T> consumer) {
        return handler(cls, obj -> {
            consumer.accept(obj);
            return null;
        });
    }

    public <T, U> Connection handler(Class<T> cls, Function<T, CompletableFuture<U>> function) {
        Assert.notNull(cls, "type");
        LockResource lockResource = new LockResource(this.mStateLock.readLock());
        Throwable th = null;
        try {
            if (this.mClosed) {
                throw new IllegalStateException("Connection closed");
            }
            this.mHandlers.put(cls, new HandlerHolder(function, ThreadContext.currentContextOrThrow()));
            if (lockResource != null) {
                if (0 != 0) {
                    try {
                        lockResource.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    lockResource.close();
                }
            }
            return null;
        } catch (Throwable th3) {
            if (lockResource != null) {
                if (0 != 0) {
                    try {
                        lockResource.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    lockResource.close();
                }
            }
            throw th3;
        }
    }

    private void handleRequestMessage(CopycatMessage copycatMessage) {
        long requestId = copycatMessage.getRequestHeader().getRequestId();
        try {
            Object readObject = this.mContext.serializer().readObject(copycatMessage.getMessage().newInput());
            LOG.debug("Handling request({}) of type: {}. Connection: {}", new Object[]{Long.valueOf(requestId), readObject.getClass().getName(), this.mConnectionId});
            HandlerHolder handlerHolder = this.mHandlers.get(readObject.getClass());
            if (handlerHolder != null) {
                handlerHolder.getContext().executor().execute(() -> {
                    handleRequest(requestId, readObject, handlerHolder);
                });
            } else {
                sendResponse(requestId, this.mContext, new SerializationException("Unknown message type: " + readObject.getClass()));
            }
        } catch (SerializationException e) {
            sendResponse(requestId, this.mContext, e);
        }
    }

    private void handleRequest(long j, Object obj, HandlerHolder handlerHolder) {
        CompletableFuture completableFuture = (CompletableFuture) handlerHolder.getHandler().apply(obj);
        if (completableFuture != null) {
            completableFuture.whenComplete((obj2, th) -> {
                Runnable runnable = () -> {
                    if (th == null) {
                        sendResponse(j, this.mContext, obj2);
                    } else {
                        sendResponse(j, this.mContext, th);
                    }
                };
                if (ThreadContext.currentContext() != null) {
                    runnable.run();
                } else {
                    this.mContext.executor().execute(runnable);
                }
            });
        }
    }

    private void sendResponse(long j, ThreadContext threadContext, Object obj) {
        LOG.debug("Sending response of type: {} for request({}). Connection: {}", new Object[]{responseObjectType(obj), Long.valueOf(j), this.mConnectionOwner});
        CopycatMessage.Builder responseHeader = CopycatMessage.newBuilder().setResponseHeader(CopycatResponseHeader.newBuilder().setRequestId(j).setIsThrowable(obj instanceof Throwable));
        if (obj != null) {
            responseHeader.setMessage(UnsafeByteOperations.unsafeWrap(threadContext.serializer().writeObject(obj).array()));
        }
        this.mTargetObserver.onNext(responseHeader.build());
    }

    protected void handleResponseMessage(CopycatMessage copycatMessage) {
        ContextualFuture remove = this.mResponseFutures.remove(Long.valueOf(copycatMessage.getResponseHeader().getRequestId()));
        if (remove == null) {
            LOG.debug("Received a response for nonexistent request({}).Connection is closed or the request has been timed out. Connection: {}", Long.valueOf(copycatMessage.getResponseHeader().getRequestId()), this.mConnectionId);
            return;
        }
        try {
            if (copycatMessage.getResponseHeader().getIsThrowable()) {
                Throwable th = (Throwable) this.mContext.serializer().readObject(copycatMessage.getMessage().newInput());
                LOG.debug("Received an exception for request({}). Connection: {}", new Object[]{Long.valueOf(copycatMessage.getResponseHeader().getRequestId()), this.mConnectionId, th});
                remove.getContext().executor().execute(() -> {
                    remove.completeExceptionally(th);
                });
            } else {
                AtomicReference atomicReference = new AtomicReference(null);
                if (copycatMessage.hasMessage()) {
                    atomicReference.set(this.mContext.serializer().readObject(copycatMessage.getMessage().newInput()));
                }
                LOG.debug("Received response of type: {} for request({}). Connection: {}", new Object[]{responseObjectType(atomicReference.get()), Long.valueOf(copycatMessage.getResponseHeader().getRequestId()), this.mConnectionId});
                remove.getContext().executor().execute(() -> {
                    remove.complete(atomicReference.get());
                });
            }
        } catch (SerializationException e) {
            remove.getContext().executor().execute(() -> {
                remove.completeExceptionally(e);
            });
        }
    }

    private String responseObjectType(Object obj) {
        return obj != null ? obj.getClass().getName() : "<NULL>";
    }

    public Listener<Throwable> onException(Consumer<Throwable> consumer) {
        if (this.mLastFailure != null) {
            consumer.accept(this.mLastFailure);
        }
        return this.mExceptionListeners.add((Consumer) Assert.notNull(consumer, "listener"));
    }

    public Listener<Connection> onClose(Consumer<Connection> consumer) {
        if (this.mClosed) {
            consumer.accept(this);
        }
        return this.mCloseListeners.add((Consumer) Assert.notNull(consumer, "listener"));
    }

    public CompletableFuture<Void> close() {
        return this.mClosed ? CompletableFuture.completedFuture(null) : CompletableFuture.runAsync(() -> {
            LOG.debug("Closing connection: {}", this.mConnectionId);
            LockResource lockResource = new LockResource(this.mStateLock.writeLock());
            Throwable th = null;
            try {
                this.mClosed = true;
                if (lockResource != null) {
                    if (0 != 0) {
                        try {
                            lockResource.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        lockResource.close();
                    }
                }
                this.mTimeoutScheduler.cancel();
                if (!this.mStreamCompleted) {
                    try {
                        this.mTargetObserver.onCompleted();
                    } catch (Exception e) {
                        LOG.debug("Completing underlying gRPC stream failed.", e);
                    }
                }
                failPendingRequests(new ConnectException("Connection closed."));
                Iterator it = this.mCloseListeners.iterator();
                while (it.hasNext()) {
                    ((Listener) it.next()).accept(this);
                }
            } catch (Throwable th3) {
                if (lockResource != null) {
                    if (0 != 0) {
                        try {
                            lockResource.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        lockResource.close();
                    }
                }
                throw th3;
            }
        }, this.mExecutor);
    }

    public void onNext(CopycatMessage copycatMessage) {
        LOG.debug("Received a new message. Connection: {}, RequestHeader: {}, ResponseHeader: {}", new Object[]{this.mConnectionId, copycatMessage.getRequestHeader(), copycatMessage.getResponseHeader()});
        if (copycatMessage.hasRequestHeader()) {
            handleRequestMessage(copycatMessage);
        } else {
            if (!copycatMessage.hasResponseHeader()) {
                throw new RuntimeException("Message should contain a request/response header.");
            }
            handleResponseMessage(copycatMessage);
        }
    }

    public void onError(Throwable th) {
        LOG.debug("Connection failed: {}", this.mConnectionId, th);
        LockResource lockResource = new LockResource(this.mStateLock.writeLock());
        Throwable th2 = null;
        try {
            this.mClosed = true;
            if (lockResource != null) {
                if (0 != 0) {
                    try {
                        lockResource.close();
                    } catch (Throwable th3) {
                        th2.addSuppressed(th3);
                    }
                } else {
                    lockResource.close();
                }
            }
            this.mLastFailure = th;
            failPendingRequests(th);
            Iterator it = this.mExceptionListeners.iterator();
            while (it.hasNext()) {
                ((Listener) it.next()).accept(th);
            }
            Iterator it2 = this.mCloseListeners.iterator();
            while (it2.hasNext()) {
                ((Listener) it2.next()).accept(this);
            }
        } catch (Throwable th4) {
            if (lockResource != null) {
                if (0 != 0) {
                    try {
                        lockResource.close();
                    } catch (Throwable th5) {
                        th2.addSuppressed(th5);
                    }
                } else {
                    lockResource.close();
                }
            }
            throw th4;
        }
    }

    public void onCompleted() {
        LOG.debug("Connection completed: {}", this.mConnectionId);
        this.mStreamCompleted = true;
        if (this.mConnectionOwner == ConnectionOwner.SERVER) {
            this.mTargetObserver.onCompleted();
        }
        close();
    }

    public String toString() {
        return this.mConnectionId;
    }

    private void timeoutPendingRequests() {
        long currentTimeMillis = System.currentTimeMillis();
        Iterator<Map.Entry<Long, ContextualFuture>> it = this.mResponseFutures.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Long, ContextualFuture> next = it.next();
            ContextualFuture value = next.getValue();
            if (value.getCreationTime() + this.mRequestTimeoutMs >= currentTimeMillis) {
                return;
            }
            LOG.debug("Timing out request({}). Connection: {}", next.getKey(), this.mConnectionId);
            it.remove();
            value.getContext().executor().execute(() -> {
                value.completeExceptionally(new TimeoutException("Request timed out."));
            });
        }
    }

    private void failPendingRequests(Throwable th) {
        for (Map.Entry<Long, ContextualFuture> entry : this.mResponseFutures.entrySet()) {
            LOG.debug("Closing request({}) with error: {}. Connection: {}", new Object[]{entry.getKey(), th.getClass().getName(), this.mConnectionId});
            ContextualFuture value = entry.getValue();
            value.getContext().executor().execute(() -> {
                value.completeExceptionally(th);
            });
        }
    }
}
