package org.opendaylight.controller.cluster.access.client;

import akka.actor.ActorRef;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.client.BackendInfo;
import org.opendaylight.controller.cluster.access.client.TransmitQueue;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;

@NotThreadSafe
/* loaded from: input_file:org/opendaylight/controller/cluster/access/client/AbstractClientConnection.class */
public abstract class AbstractClientConnection<T extends BackendInfo> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractClientConnection.class);

    @VisibleForTesting
    static final long BACKEND_ALIVE_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);

    @VisibleForTesting
    static final long REQUEST_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(2);

    @VisibleForTesting
    static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
    private static final long DEBUG_DELAY_NANOS = TimeUnit.MILLISECONDS.toNanos(100);
    private static final long MAX_DELAY_SECONDS = 5;
    private static final long MAX_DELAY_NANOS = TimeUnit.SECONDS.toNanos(MAX_DELAY_SECONDS);
    private final Lock lock;
    private final ClientActorContext context;

    @GuardedBy("lock")
    private final TransmitQueue queue;
    private final Long cookie;

    @GuardedBy("lock")
    private boolean haveTimer;
    private long lastReceivedTicks;
    private volatile RequestException poisoned;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractClientConnection(ClientActorContext clientActorContext, Long l, TransmitQueue transmitQueue) {
        this.lock = new ReentrantLock();
        this.context = (ClientActorContext) Preconditions.checkNotNull(clientActorContext);
        this.cookie = (Long) Preconditions.checkNotNull(l);
        this.queue = (TransmitQueue) Preconditions.checkNotNull(transmitQueue);
        this.lastReceivedTicks = currentTime();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractClientConnection(AbstractClientConnection<T> abstractClientConnection, int i) {
        this.lock = new ReentrantLock();
        this.context = abstractClientConnection.context;
        this.cookie = abstractClientConnection.cookie;
        this.queue = new TransmitQueue.Halted(i);
        this.lastReceivedTicks = abstractClientConnection.lastReceivedTicks;
    }

    public final ClientActorContext context() {
        return this.context;
    }

    @Nonnull
    public final Long cookie() {
        return this.cookie;
    }

    public final ActorRef localActor() {
        return this.context.self();
    }

    public final long currentTime() {
        return this.context.ticker().read();
    }

    public final void sendRequest(Request<?, ?> request, Consumer<Response<?, ?>> consumer) {
        long currentTime = currentTime();
        sendEntry(new ConnectionEntry(request, consumer, currentTime), currentTime);
    }

    public final void enqueueRequest(Request<?, ?> request, Consumer<Response<?, ?>> consumer, long j) {
        enqueueEntry(new ConnectionEntry(request, consumer, j), currentTime());
    }

    public final long enqueueEntry(ConnectionEntry connectionEntry, long j) {
        this.lock.lock();
        try {
            RequestException requestException = this.poisoned;
            if (requestException != null) {
                throw new IllegalStateException("Connection " + this + " has been poisoned", requestException);
            }
            if (this.queue.isEmpty()) {
                scheduleTimer((connectionEntry.getEnqueuedTicks() + REQUEST_TIMEOUT_NANOS) - j);
            }
            long enqueue = this.queue.enqueue(connectionEntry, j);
            this.lock.unlock();
            return enqueue;
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    public abstract Optional<T> getBackendInfo();

    /* JADX INFO: Access modifiers changed from: package-private */
    public final Collection<ConnectionEntry> startReplay() {
        this.lock.lock();
        return this.queue.drain();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @GuardedBy("lock")
    public final void finishReplay(ReconnectForwarder reconnectForwarder) {
        setForwarder(reconnectForwarder);
        this.lastReceivedTicks = currentTime();
        this.lock.unlock();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @GuardedBy("lock")
    public final void setForwarder(ReconnectForwarder reconnectForwarder) {
        this.queue.setForwarder(reconnectForwarder, currentTime());
    }

    @GuardedBy("lock")
    abstract ClientActorBehavior<T> lockedReconnect(ClientActorBehavior<T> clientActorBehavior, RequestException requestException);

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void sendEntry(ConnectionEntry connectionEntry, long j) {
        long enqueueEntry = enqueueEntry(connectionEntry, j);
        try {
            if (enqueueEntry >= DEBUG_DELAY_NANOS) {
                if (enqueueEntry > MAX_DELAY_NANOS) {
                    LOG.info("Capping {} throttle delay from {} to {} seconds", new Object[]{this, Long.valueOf(TimeUnit.NANOSECONDS.toSeconds(enqueueEntry)), Long.valueOf(MAX_DELAY_SECONDS), new Throwable()});
                    enqueueEntry = MAX_DELAY_NANOS;
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{}: Sleeping for {}ms on connection {}", new Object[]{this.context.persistenceId(), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(enqueueEntry)), this});
                }
            }
            TimeUnit.NANOSECONDS.sleep(enqueueEntry);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.debug("Interrupted after sleeping {}ns", e, Long.valueOf(currentTime() - j));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final ClientActorBehavior<T> reconnect(ClientActorBehavior<T> clientActorBehavior, RequestException requestException) {
        this.lock.lock();
        try {
            ClientActorBehavior<T> lockedReconnect = lockedReconnect(clientActorBehavior, requestException);
            this.lock.unlock();
            return lockedReconnect;
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @GuardedBy("lock")
    private void scheduleTimer(long j) {
        if (this.haveTimer) {
            LOG.debug("{}: timer already scheduled on {}", this.context.persistenceId(), this);
            return;
        }
        if (this.queue.hasSuccessor()) {
            LOG.debug("{}: connection {} has a successor, not scheduling timer", this.context.persistenceId(), this);
            return;
        }
        FiniteDuration fromNanos = FiniteDuration.fromNanos(j <= 0 ? 0L : Math.min(j, BACKEND_ALIVE_TIMEOUT_NANOS));
        LOG.debug("{}: connection {} scheduling timeout in {}", new Object[]{this.context.persistenceId(), this, fromNanos});
        this.context.executeInActor(this::runTimer, fromNanos);
        this.haveTimer = true;
    }

    @VisibleForTesting
    final ClientActorBehavior<T> runTimer(ClientActorBehavior<T> clientActorBehavior) {
        this.lock.lock();
        try {
            this.haveTimer = false;
            long currentTime = currentTime();
            LOG.debug("{}: running timer on {}", this.context.persistenceId(), this);
            long ticksStalling = this.queue.ticksStalling(currentTime);
            if (ticksStalling >= NO_PROGRESS_TIMEOUT_NANOS) {
                LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this, Long.valueOf(TimeUnit.NANOSECONDS.toSeconds(ticksStalling)));
                lockedPoison(new NoProgressException(ticksStalling));
                clientActorBehavior.removeConnection(this);
                this.lock.unlock();
                return clientActorBehavior;
            }
            Optional<Long> lockedCheckTimeout = lockedCheckTimeout(currentTime);
            if (lockedCheckTimeout == null) {
                LOG.debug("{}: connection {} timed out", this.context.persistenceId(), this);
                ClientActorBehavior<T> lockedReconnect = lockedReconnect(clientActorBehavior, new RuntimeRequestException("Backend connection timed out", new TimeoutException()));
                this.lock.unlock();
                return lockedReconnect;
            }
            if (lockedCheckTimeout.isPresent()) {
                scheduleTimer(lockedCheckTimeout.get().longValue());
            } else {
                LOG.debug("{}: not scheduling timeout on {}", this.context.persistenceId(), this);
            }
            return clientActorBehavior;
        } finally {
            this.lock.unlock();
        }
    }

    @VisibleForTesting
    final Optional<Long> checkTimeout(long j) {
        this.lock.lock();
        try {
            Optional<Long> lockedCheckTimeout = lockedCheckTimeout(j);
            this.lock.unlock();
            return lockedCheckTimeout;
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    long backendSilentTicks(long j) {
        return j - this.lastReceivedTicks;
    }

    @SuppressFBWarnings(value = {"NP_OPTIONAL_RETURN_NULL"}, justification = "Returning null Optional is documented in the API contract.")
    @GuardedBy("lock")
    private Optional<Long> lockedCheckTimeout(long j) {
        if (this.queue.isEmpty()) {
            LOG.debug("{}: connection {} is empty", this.context.persistenceId(), this);
            return Optional.empty();
        }
        long backendSilentTicks = backendSilentTicks(j);
        if (backendSilentTicks >= BACKEND_ALIVE_TIMEOUT_NANOS) {
            LOG.debug("{}: Connection {} has not seen activity from backend for {} nanoseconds, timing out", new Object[]{this.context.persistenceId(), this, Long.valueOf(backendSilentTicks)});
            return null;
        }
        int i = 0;
        ConnectionEntry peek = this.queue.peek();
        while (true) {
            ConnectionEntry connectionEntry = peek;
            if (connectionEntry == null) {
                LOG.debug("Connection {} timed out {} tasks", this, Integer.valueOf(i));
                if (i != 0) {
                    this.queue.tryTransmit(j);
                }
                return Optional.empty();
            }
            long enqueuedTicks = j - connectionEntry.getEnqueuedTicks();
            if (enqueuedTicks < REQUEST_TIMEOUT_NANOS) {
                return Optional.of(Long.valueOf(REQUEST_TIMEOUT_NANOS - enqueuedTicks));
            }
            i++;
            this.queue.remove(j);
            LOG.debug("{}: Connection {} timed out entry {}", new Object[]{this.context.persistenceId(), this, connectionEntry});
            connectionEntry.complete(connectionEntry.getRequest().toRequestFailure(new RequestTimeoutException("Timed out after " + ((enqueuedTicks * 1.0d) / 1.0E9d) + "seconds")));
            peek = this.queue.peek();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void poison(RequestException requestException) {
        this.lock.lock();
        try {
            lockedPoison(requestException);
        } finally {
            this.lock.unlock();
        }
    }

    @GuardedBy("lock")
    private void lockedPoison(RequestException requestException) {
        this.poisoned = enrichPoison(requestException);
        this.queue.poison(requestException);
    }

    RequestException enrichPoison(RequestException requestException) {
        return requestException;
    }

    @VisibleForTesting
    final RequestException poisoned() {
        return this.poisoned;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void receiveResponse(ResponseEnvelope<?> responseEnvelope) {
        long currentTime = currentTime();
        this.lastReceivedTicks = currentTime;
        this.lock.lock();
        try {
            Optional<TransmittedConnectionEntry> complete = this.queue.complete(responseEnvelope, currentTime);
            this.lock.unlock();
            if (complete.isPresent()) {
                TransmittedConnectionEntry transmittedConnectionEntry = complete.get();
                LOG.debug("Completing {} with {}", transmittedConnectionEntry, responseEnvelope);
                transmittedConnectionEntry.complete((Response) responseEnvelope.getMessage());
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    public final String toString() {
        return addToStringAttributes(MoreObjects.toStringHelper(this).omitNullValues()).toString();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MoreObjects.ToStringHelper addToStringAttributes(MoreObjects.ToStringHelper toStringHelper) {
        return toStringHelper.add("client", this.context.m6getIdentifier()).add("cookie", this.cookie).add("poisoned", this.poisoned);
    }
}
