package io.pravega.client.connection.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.ClientConfig;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.shaded.com.google.common.annotations.VisibleForTesting;
import io.pravega.shaded.com.google.common.base.Preconditions;
import io.pravega.shared.metrics.ClientMetricUpdater;
import io.pravega.shared.metrics.MetricListener;
import io.pravega.shared.metrics.MetricNotifier;
import io.pravega.shared.protocol.netty.ConnectionFailedException;
import io.pravega.shared.protocol.netty.PravegaNodeUri;
import io.pravega.shared.protocol.netty.ReplyProcessor;
import java.beans.ConstructorProperties;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/connection/impl/ConnectionPoolImpl.class */
public class ConnectionPoolImpl implements ConnectionPool {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ConnectionPoolImpl.class);

    @VisibleForTesting
    private final ClientConfig clientConfig;
    private final MetricNotifier metricNotifier;
    private final ConnectionFactory connectionFactory;
    private final Object lock = new Object();
    private final AtomicBoolean closed = new AtomicBoolean(false);

    @GuardedBy("lock")
    private final Map<PravegaNodeUri, List<Connection>> connectionMap = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/client/connection/impl/ConnectionPoolImpl$Connection.class */
    public class Connection implements Comparable<Connection>, AutoCloseable {
        private final PravegaNodeUri uri;
        private final CompletableFuture<FlowHandler> flowHandler;

        int getFlowCount() {
            if (Futures.isSuccessful(this.flowHandler)) {
                return this.flowHandler.join().getOpenFlowCount();
            }
            return 0;
        }

        boolean isConnected() {
            return Futures.isSuccessful(this.flowHandler) && !this.flowHandler.join().isClosed();
        }

        @Override // java.lang.Comparable
        public int compareTo(Connection connection) {
            return Integer.compare(Futures.isSuccessful(getFlowHandler()) ? getFlowCount() : Integer.MAX_VALUE, Futures.isSuccessful(connection.getFlowHandler()) ? connection.getFlowCount() : Integer.MAX_VALUE);
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            if (Futures.isSuccessful(this.flowHandler)) {
                this.flowHandler.join().close();
            }
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        @ConstructorProperties({"uri", "flowHandler"})
        public Connection(PravegaNodeUri pravegaNodeUri, CompletableFuture<FlowHandler> completableFuture) {
            this.uri = pravegaNodeUri;
            this.flowHandler = completableFuture;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public PravegaNodeUri getUri() {
            return this.uri;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public CompletableFuture<FlowHandler> getFlowHandler() {
            return this.flowHandler;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof Connection)) {
                return false;
            }
            Connection connection = (Connection) obj;
            if (!connection.canEqual(this)) {
                return false;
            }
            PravegaNodeUri uri = getUri();
            PravegaNodeUri uri2 = connection.getUri();
            if (uri == null) {
                if (uri2 != null) {
                    return false;
                }
            } else if (!uri.equals(uri2)) {
                return false;
            }
            CompletableFuture<FlowHandler> flowHandler = getFlowHandler();
            CompletableFuture<FlowHandler> flowHandler2 = connection.getFlowHandler();
            return flowHandler == null ? flowHandler2 == null : flowHandler.equals(flowHandler2);
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        protected boolean canEqual(Object obj) {
            return obj instanceof Connection;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public int hashCode() {
            PravegaNodeUri uri = getUri();
            int hashCode = (1 * 59) + (uri == null ? 43 : uri.hashCode());
            CompletableFuture<FlowHandler> flowHandler = getFlowHandler();
            return (hashCode * 59) + (flowHandler == null ? 43 : flowHandler.hashCode());
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String toString() {
            return "ConnectionPoolImpl.Connection(uri=" + getUri() + ", flowHandler=" + getFlowHandler() + ")";
        }
    }

    public ConnectionPoolImpl(ClientConfig clientConfig, ConnectionFactory connectionFactory) {
        this.clientConfig = clientConfig;
        this.connectionFactory = connectionFactory;
        MetricListener metricListener = clientConfig.getMetricListener();
        this.metricNotifier = metricListener == null ? MetricNotifier.NO_OP_METRIC_NOTIFIER : new ClientMetricUpdater(metricListener);
    }

    @Override // io.pravega.client.connection.impl.ConnectionPool
    public CompletableFuture<ClientConnection> getClientConnection(Flow flow, PravegaNodeUri pravegaNodeUri, ReplyProcessor replyProcessor) {
        Connection connection;
        CompletableFuture thenApply;
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkNotNull(flow, "Flow");
        Preconditions.checkNotNull(pravegaNodeUri, "Location");
        Preconditions.checkNotNull(replyProcessor, "ReplyProcessor");
        synchronized (this.lock) {
            Exceptions.checkNotClosed(this.closed.get(), this);
            List<Connection> list = (List) this.connectionMap.getOrDefault(pravegaNodeUri, new ArrayList()).stream().filter(connection2 -> {
                return !connection2.getFlowHandler().isDone() || connection2.isConnected();
            }).collect(Collectors.toList());
            log.debug("List of connections to {} that can be used: {}", pravegaNodeUri, list);
            Optional<Connection> min = list.stream().min(Comparator.naturalOrder());
            if (!min.isPresent() || (list.size() < this.clientConfig.getMaxConnectionsPerSegmentStore() && !isUnused(min.get()))) {
                log.trace("Creating a new connection to {}", pravegaNodeUri);
                connection = new Connection(pravegaNodeUri, establishConnection(pravegaNodeUri));
                list.add(connection);
            } else {
                log.trace("Reusing connection: {}", min.get());
                connection = min.get();
            }
            this.connectionMap.put(pravegaNodeUri, list);
            thenApply = connection.getFlowHandler().thenApply(flowHandler -> {
                return flowHandler.createFlow(flow, replyProcessor);
            });
        }
        return thenApply;
    }

    @Override // io.pravega.client.connection.impl.ConnectionPool
    public CompletableFuture<ClientConnection> getClientConnection(PravegaNodeUri pravegaNodeUri, ReplyProcessor replyProcessor) {
        Preconditions.checkNotNull(pravegaNodeUri, "Location");
        Preconditions.checkNotNull(replyProcessor, "ReplyProcessor");
        Exceptions.checkNotClosed(this.closed.get(), this);
        return new Connection(pravegaNodeUri, establishConnection(pravegaNodeUri)).getFlowHandler().thenApply(flowHandler -> {
            return flowHandler.createConnectionWithFlowDisabled(replyProcessor);
        });
    }

    @Override // io.pravega.client.connection.impl.ConnectionPool
    public void getClientConnection(Flow flow, PravegaNodeUri pravegaNodeUri, ReplyProcessor replyProcessor, CompletableFuture<ClientConnection> completableFuture) {
        CompletableFuture<ClientConnection> clientConnection = getClientConnection(flow, pravegaNodeUri, replyProcessor);
        Objects.requireNonNull(completableFuture);
        clientConnection.thenApply((v1) -> {
            return r1.complete(v1);
        }).exceptionally((Function<Throwable, ? extends U>) th -> {
            return Boolean.valueOf(completableFuture.completeExceptionally(new ConnectionFailedException(th)));
        });
    }

    private static boolean isUnused(Connection connection) {
        return Futures.isSuccessful(connection.getFlowHandler()) && connection.getFlowCount() == 0;
    }

    @VisibleForTesting
    public void pruneUnusedConnections() {
        synchronized (this.lock) {
            Iterator<List<Connection>> it = this.connectionMap.values().iterator();
            while (it.hasNext()) {
                Iterator<Connection> it2 = it.next().iterator();
                while (it2.hasNext()) {
                    Connection next = it2.next();
                    if (isUnused(next)) {
                        next.getFlowHandler().join().close();
                        it2.remove();
                    }
                }
            }
        }
    }

    @VisibleForTesting
    public List<Connection> getActiveChannels() {
        ArrayList arrayList;
        synchronized (this.lock) {
            arrayList = new ArrayList();
            Iterator<List<Connection>> it = this.connectionMap.values().iterator();
            while (it.hasNext()) {
                arrayList.addAll(it.next());
            }
        }
        return arrayList;
    }

    private CompletableFuture<FlowHandler> establishConnection(PravegaNodeUri pravegaNodeUri) {
        return FlowHandler.openConnection(pravegaNodeUri, this.metricNotifier, this.connectionFactory);
    }

    @Override // io.pravega.client.connection.impl.ConnectionPool, java.lang.AutoCloseable
    public void close() {
        log.info("Shutting down connection pool");
        if (this.closed.compareAndSet(false, true)) {
            this.metricNotifier.close();
            this.connectionFactory.close();
            synchronized (this.lock) {
                Iterator<List<Connection>> it = this.connectionMap.values().iterator();
                while (it.hasNext()) {
                    Iterator<Connection> it2 = it.next().iterator();
                    while (it2.hasNext()) {
                        it2.next().close();
                    }
                }
                this.connectionMap.clear();
            }
        }
    }

    @Override // io.pravega.client.connection.impl.ConnectionPool
    public ScheduledExecutorService getInternalExecutor() {
        return this.connectionFactory.getInternalExecutor();
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public ClientConfig getClientConfig() {
        return this.clientConfig;
    }
}
