package org.ehcache.clustered.client.internal.store;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import org.ehcache.clustered.client.config.Timeouts;
import org.ehcache.clustered.client.internal.store.ServerStoreProxy;
import org.ehcache.clustered.common.internal.messages.ClusterTierReconnectMessage;
import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse;
import org.ehcache.clustered.common.internal.store.Chain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/ehcache/clustered/client/internal/store/StrongServerStoreProxy.class */
public class StrongServerStoreProxy implements ServerStoreProxy {
    private static final Logger LOGGER = LoggerFactory.getLogger(StrongServerStoreProxy.class);
    private final CommonServerStoreProxy delegate;
    private final ConcurrentMap<Long, CountDownLatch> hashInvalidationsInProgress = new ConcurrentHashMap();
    private final AtomicReference<CountDownLatch> invalidateAllLatch = new AtomicReference<>();
    private final ClusterTierClientEntity entity;

    public StrongServerStoreProxy(String str, ClusterTierClientEntity clusterTierClientEntity, ServerStoreProxy.ServerCallback serverCallback) {
        this.delegate = new CommonServerStoreProxy(str, clusterTierClientEntity, serverCallback);
        this.entity = clusterTierClientEntity;
        this.delegate.addResponseListener(EhcacheEntityResponse.HashInvalidationDone.class, this::hashInvalidationDoneResponseListener);
        this.delegate.addResponseListener(EhcacheEntityResponse.AllInvalidationDone.class, this::allInvalidationDoneResponseListener);
        clusterTierClientEntity.addReconnectListener(this::reconnectListener);
        clusterTierClientEntity.addDisconnectionListener(this::disconnectionListener);
    }

    private void disconnectionListener() {
        Iterator<CountDownLatch> it = this.hashInvalidationsInProgress.values().iterator();
        while (it.hasNext()) {
            it.next().countDown();
        }
        this.hashInvalidationsInProgress.clear();
        CountDownLatch countDownLatch = this.invalidateAllLatch.get();
        if (countDownLatch != null) {
            countDownLatch.countDown();
        }
    }

    private void allInvalidationDoneResponseListener(EhcacheEntityResponse.AllInvalidationDone allInvalidationDone) {
        LOGGER.debug("CLIENT: on cache {}, server notified that clients invalidated all", getCacheId());
        CountDownLatch andSet = this.invalidateAllLatch.getAndSet(null);
        if (andSet != null) {
            LOGGER.debug("CLIENT: on cache {}, count down", getCacheId());
            andSet.countDown();
        }
    }

    private void hashInvalidationDoneResponseListener(EhcacheEntityResponse.HashInvalidationDone hashInvalidationDone) {
        long key = hashInvalidationDone.getKey();
        LOGGER.debug("CLIENT: on cache {}, server notified that clients invalidated hash {}", getCacheId(), Long.valueOf(key));
        CountDownLatch remove = this.hashInvalidationsInProgress.remove(Long.valueOf(key));
        if (remove != null) {
            remove.countDown();
        }
    }

    private void reconnectListener(ClusterTierReconnectMessage clusterTierReconnectMessage) {
        clusterTierReconnectMessage.addInvalidationsInProgress(this.hashInvalidationsInProgress.keySet());
        if (this.invalidateAllLatch.get() != null) {
            clusterTierReconnectMessage.clearInProgress();
        }
    }

    private <T> T performWaitingForHashInvalidation(long j, Callable<T> callable, Duration duration) throws TimeoutException {
        LongSupplier nanosStartingFromNow = Timeouts.nanosStartingFromNow(duration);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        while (this.entity.isConnected()) {
            CountDownLatch putIfAbsent = this.hashInvalidationsInProgress.putIfAbsent(Long.valueOf(j), countDownLatch);
            if (putIfAbsent == null) {
                try {
                    T call = callable.call();
                    LOGGER.debug("CLIENT: Waiting for invalidations on key {}", Long.valueOf(j));
                    awaitOnLatch(countDownLatch, nanosStartingFromNow);
                    LOGGER.debug("CLIENT: key {} invalidated on all clients, unblocking call", Long.valueOf(j));
                    return call;
                } catch (Exception e) {
                    this.hashInvalidationsInProgress.remove(Long.valueOf(j));
                    countDownLatch.countDown();
                    if (e instanceof TimeoutException) {
                        throw ((TimeoutException) e);
                    }
                    if (e instanceof ServerStoreProxyException) {
                        throw ((ServerStoreProxyException) e);
                    }
                    throw new RuntimeException(e);
                }
            }
            awaitOnLatch(putIfAbsent, nanosStartingFromNow);
        }
        throw new IllegalStateException("Cluster tier manager disconnected");
    }

    private <T> T performWaitingForAllInvalidation(Callable<T> callable, Duration duration) throws TimeoutException {
        LongSupplier nanosStartingFromNow = Timeouts.nanosStartingFromNow(duration);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        while (this.entity.isConnected()) {
            if (this.invalidateAllLatch.compareAndSet(null, countDownLatch)) {
                try {
                    T call = callable.call();
                    awaitOnLatch(countDownLatch, nanosStartingFromNow);
                    LOGGER.debug("CLIENT: all invalidated on all clients, unblocking call");
                    return call;
                } catch (Exception e) {
                    this.invalidateAllLatch.set(null);
                    countDownLatch.countDown();
                    if (e instanceof TimeoutException) {
                        throw ((TimeoutException) e);
                    }
                    if (e instanceof ServerStoreProxyException) {
                        throw ((ServerStoreProxyException) e);
                    }
                    throw new RuntimeException(e);
                }
            }
            CountDownLatch countDownLatch2 = this.invalidateAllLatch.get();
            if (countDownLatch2 != null) {
                awaitOnLatch(countDownLatch2, nanosStartingFromNow);
            }
        }
        throw new IllegalStateException("Cluster tier manager disconnected");
    }

    private void awaitOnLatch(CountDownLatch countDownLatch, LongSupplier longSupplier) throws TimeoutException {
        boolean interrupted = Thread.interrupted();
        while (true) {
            try {
                break;
            } catch (InterruptedException e) {
                interrupted = true;
            } catch (Throwable th) {
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
                throw th;
            }
        }
        if (!countDownLatch.await(longSupplier.getAsLong(), TimeUnit.NANOSECONDS)) {
            throw new TimeoutException();
        }
        if (!this.entity.isConnected()) {
            throw new IllegalStateException("Cluster tier manager disconnected");
        }
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
    }

    @Override // org.ehcache.clustered.client.internal.store.ServerStoreProxy
    public String getCacheId() {
        return this.delegate.getCacheId();
    }

    @Override // org.ehcache.clustered.client.internal.store.ServerStoreProxy
    public void close() {
        this.delegate.close();
    }

    @Override // org.ehcache.clustered.client.internal.store.ServerStoreProxy, org.ehcache.clustered.common.internal.store.ServerStore
    public ServerStoreProxy.ChainEntry get(long j) throws TimeoutException {
        return this.delegate.get(j);
    }

    @Override // org.ehcache.clustered.common.internal.store.ServerStore
    public void append(long j, ByteBuffer byteBuffer) throws TimeoutException {
        performWaitingForHashInvalidation(j, () -> {
            this.delegate.append(j, byteBuffer);
            return null;
        }, this.entity.getTimeouts().getWriteOperationTimeout());
    }

    @Override // org.ehcache.clustered.client.internal.store.ServerStoreProxy, org.ehcache.clustered.common.internal.store.ServerStore
    public ServerStoreProxy.ChainEntry getAndAppend(long j, ByteBuffer byteBuffer) throws TimeoutException {
        return (ServerStoreProxy.ChainEntry) performWaitingForHashInvalidation(j, () -> {
            return this.delegate.getAndAppend(j, byteBuffer);
        }, this.entity.getTimeouts().getWriteOperationTimeout());
    }

    @Override // org.ehcache.clustered.client.internal.store.ServerStoreProxy
    public void enableEvents(boolean z) {
        this.delegate.enableEvents(z);
    }

    @Override // org.ehcache.clustered.common.internal.store.ServerStore
    public void replaceAtHead(long j, Chain chain, Chain chain2) {
        this.delegate.replaceAtHead(j, chain, chain2);
    }

    @Override // org.ehcache.clustered.common.internal.store.ServerStore
    public void clear() throws TimeoutException {
        performWaitingForAllInvalidation(() -> {
            this.delegate.clear();
            return null;
        }, this.entity.getTimeouts().getWriteOperationTimeout());
    }

    @Override // org.ehcache.clustered.common.internal.store.ServerStore
    public Iterator<Map.Entry<Long, Chain>> iterator() throws TimeoutException {
        return this.delegate.iterator();
    }
}
