package io.servicetalk.http.utils;

import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.DelegatingConnectionFactory;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.transport.api.TransportObserver;
import java.util.HashMap;
import java.util.Map;
import java.util.function.ToIntFunction;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/servicetalk/http/utils/CacheConnectionFactory.class */
public final class CacheConnectionFactory<ResolvedAddress, C extends ListenableAsyncCloseable> extends DelegatingConnectionFactory<ResolvedAddress, C> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) CacheConnectionFactory.class);
    private final Map<ResolvedAddress, Item<C>> map;
    private final ToIntFunction<ResolvedAddress> maxConcurrencyFunc;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/servicetalk/http/utils/CacheConnectionFactory$Item.class */
    public static final class Item<C> {

        @Nullable
        Single<C> single;
        private int subscriberCount;

        private Item() {
            this.subscriberCount = 1;
        }

        @Nullable
        Single<C> addSubscriber(int i) {
            if (this.subscriberCount >= i) {
                return null;
            }
            this.subscriberCount++;
            return this.single;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CacheConnectionFactory(ConnectionFactory<ResolvedAddress, C> connectionFactory, ToIntFunction<ResolvedAddress> toIntFunction) {
        super(connectionFactory);
        this.map = new HashMap();
        this.maxConcurrencyFunc = toIntFunction;
    }

    @Override // io.servicetalk.client.api.DelegatingConnectionFactory, io.servicetalk.client.api.ConnectionFactory
    @Deprecated
    public Single<C> newConnection(ResolvedAddress resolvedaddress, @Nullable TransportObserver transportObserver) {
        return newConnection(resolvedaddress, null, transportObserver);
    }

    @Override // io.servicetalk.client.api.DelegatingConnectionFactory, io.servicetalk.client.api.ConnectionFactory
    public Single<C> newConnection(ResolvedAddress resolvedaddress, @Nullable ContextMap contextMap, @Nullable TransportObserver transportObserver) {
        return Single.defer(() -> {
            Single<C> single;
            int applyAsInt = this.maxConcurrencyFunc.applyAsInt(resolvedaddress);
            if (applyAsInt <= 1) {
                return delegate().newConnection(resolvedaddress, contextMap, transportObserver);
            }
            synchronized (this.map) {
                Item<C> item = this.map.get(resolvedaddress);
                if (item != null) {
                    Single<C> addSubscriber = item.addSubscriber(applyAsInt);
                    single = addSubscriber;
                }
                Item<C> item2 = new Item<>();
                this.map.put(resolvedaddress, item2);
                Single<C> liftSync = delegate().newConnection(resolvedaddress, contextMap, transportObserver).liftSync(subscriber -> {
                    return new SingleSource.Subscriber<C>() { // from class: io.servicetalk.http.utils.CacheConnectionFactory.1
                        static final /* synthetic */ boolean $assertionsDisabled;

                        @Override // io.servicetalk.concurrent.SingleSource.Subscriber
                        public void onSubscribe(Cancellable cancellable) {
                            SingleSource.Subscriber subscriber = subscriber;
                            Object obj = resolvedaddress;
                            Item item3 = item2;
                            subscriber.onSubscribe(() -> {
                                try {
                                    if (!$assertionsDisabled && !Thread.holdsLock(CacheConnectionFactory.this.map)) {
                                        throw new AssertionError();
                                    }
                                    CacheConnectionFactory.this.map.remove(obj, item3);
                                    cancellable.cancel();
                                } catch (Throwable th) {
                                    cancellable.cancel();
                                    throw th;
                                }
                            });
                        }

                        @Override // io.servicetalk.concurrent.SingleSource.Subscriber
                        public void onSuccess(@Nullable C c) {
                            try {
                                if (c == null) {
                                    lockRemoveFromMap();
                                } else {
                                    c.onClosing().whenFinally(this::lockRemoveFromMap).subscribe();
                                }
                                subscriber.onSuccess(c);
                            } catch (Throwable th) {
                                if (c != null) {
                                    CacheConnectionFactory.LOGGER.debug("Unexpected error, closing connection='{}'", c, th);
                                    c.closeAsync().subscribe();
                                }
                                subscriber.onError(th);
                            }
                        }

                        @Override // io.servicetalk.concurrent.SingleSource.Subscriber
                        public void onError(Throwable th) {
                            lockRemoveFromMap();
                            subscriber.onError(th);
                        }

                        private void lockRemoveFromMap() {
                            synchronized (CacheConnectionFactory.this.map) {
                                CacheConnectionFactory.this.map.remove(resolvedaddress, item2);
                            }
                        }

                        static {
                            $assertionsDisabled = !CacheConnectionFactory.class.desiredAssertionStatus();
                        }
                    };
                }).cache().liftSync(subscriber2 -> {
                    return new SingleSource.Subscriber<C>() { // from class: io.servicetalk.http.utils.CacheConnectionFactory.2
                        @Override // io.servicetalk.concurrent.SingleSource.Subscriber
                        public void onSubscribe(Cancellable cancellable) {
                            subscriber2.onSubscribe(() -> {
                                synchronized (CacheConnectionFactory.this.map) {
                                    cancellable.cancel();
                                }
                            });
                        }

                        @Override // io.servicetalk.concurrent.SingleSource.Subscriber
                        public void onSuccess(@Nullable C c) {
                            try {
                                subscriber2.onSuccess(c);
                            } finally {
                                lockRemoveFromMap();
                            }
                        }

                        @Override // io.servicetalk.concurrent.SingleSource.Subscriber
                        public void onError(Throwable th) {
                            try {
                                subscriber2.onError(th);
                            } finally {
                                lockRemoveFromMap();
                            }
                        }

                        private void lockRemoveFromMap() {
                            synchronized (CacheConnectionFactory.this.map) {
                                CacheConnectionFactory.this.map.remove(resolvedaddress, item2);
                            }
                        }
                    };
                });
                item2.single = liftSync;
                single = liftSync;
            }
            return single.shareContextOnSubscribe();
        });
    }
}
