package org.opendaylight.controller.cluster.databroker.actors.dds;

import akka.actor.ActorRef;
import akka.actor.Status;
import com.google.common.base.Throwables;
import com.google.common.base.Verify;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.StampedLock;
import java.util.stream.Stream;
import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
import org.opendaylight.controller.cluster.access.client.ClientActorContext;
import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
import org.opendaylight.controller.cluster.access.client.ReconnectForwarder;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.class */
public abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior<ShardBackendInfo> implements DataStoreClient {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStoreClientBehavior.class);
    private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories;
    private final AtomicLong nextHistoryId;
    private final StampedLock lock;
    private final SingleClientHistory singleHistory;
    private volatile Throwable aborted;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractDataStoreClientBehavior(ClientActorContext clientActorContext, AbstractShardBackendResolver abstractShardBackendResolver) {
        super(clientActorContext, abstractShardBackendResolver);
        this.histories = new ConcurrentHashMap();
        this.nextHistoryId = new AtomicLong(1L);
        this.lock = new StampedLock();
        this.singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0L));
    }

    protected final void haltClient(Throwable th) {
        if (this.aborted != null) {
            abortOperations(th);
        }
    }

    private void abortOperations(Throwable th) {
        long writeLock = this.lock.writeLock();
        try {
            this.aborted = th;
            Iterator<ClientLocalHistory> it = this.histories.values().iterator();
            while (it.hasNext()) {
                it.next().localAbort(th);
            }
            this.histories.clear();
            this.lock.unlockWrite(writeLock);
        } catch (Throwable th2) {
            this.lock.unlockWrite(writeLock);
            throw th2;
        }
    }

    private AbstractDataStoreClientBehavior shutdown(ClientActorBehavior<ShardBackendInfo> clientActorBehavior) {
        abortOperations(new IllegalStateException("Client " + getIdentifier() + " has been shut down"));
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: onCommand, reason: merged with bridge method [inline-methods] */
    public final AbstractDataStoreClientBehavior m23onCommand(Object obj) {
        if (obj instanceof GetClientRequest) {
            ((GetClientRequest) obj).getReplyTo().tell(new Status.Success(this), ActorRef.noSender());
        } else {
            LOG.warn("{}: ignoring unhandled command {}", persistenceId(), obj);
        }
        return this;
    }

    protected final ClientActorBehavior.ConnectionConnectCohort connectionUp(ConnectedClientConnection<ShardBackendInfo> connectedClientConnection) {
        long writeLock = this.lock.writeLock();
        ArrayList arrayList = new ArrayList();
        startReconnect(this.singleHistory, connectedClientConnection, arrayList);
        Iterator<ClientLocalHistory> it = this.histories.values().iterator();
        while (it.hasNext()) {
            startReconnect(it.next(), connectedClientConnection, arrayList);
        }
        return collection -> {
            return finishReconnect(connectedClientConnection, writeLock, arrayList, collection);
        };
    }

    private ReconnectForwarder finishReconnect(ConnectedClientConnection<ShardBackendInfo> connectedClientConnection, long j, Collection<HistoryReconnectCohort> collection, Collection<ConnectionEntry> collection2) {
        try {
            Iterator<HistoryReconnectCohort> it = collection.iterator();
            while (it.hasNext()) {
                it.next().replayRequests(collection2);
            }
            ReconnectForwarder forCohorts = BouncingReconnectForwarder.forCohorts(connectedClientConnection, collection);
            try {
                Iterator<HistoryReconnectCohort> it2 = collection.iterator();
                while (it2.hasNext()) {
                    it2.next().close();
                }
                return forCohorts;
            } finally {
            }
        } catch (Throwable th) {
            try {
                Iterator<HistoryReconnectCohort> it3 = collection.iterator();
                while (it3.hasNext()) {
                    it3.next().close();
                }
                this.lock.unlockWrite(j);
                throw th;
            } finally {
                this.lock.unlockWrite(j);
            }
        }
    }

    private static void startReconnect(AbstractClientHistory abstractClientHistory, ConnectedClientConnection<ShardBackendInfo> connectedClientConnection, Collection<HistoryReconnectCohort> collection) {
        HistoryReconnectCohort startReconnect = abstractClientHistory.startReconnect(connectedClientConnection);
        if (startReconnect != null) {
            collection.add(startReconnect);
        }
    }

    @Override // org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient
    public final ClientLocalHistory createLocalHistory() {
        LocalHistoryIdentifier localHistoryIdentifier = new LocalHistoryIdentifier(getIdentifier(), this.nextHistoryId.getAndIncrement());
        long readLock = this.lock.readLock();
        try {
            if (this.aborted != null) {
                Throwables.throwIfUnchecked(this.aborted);
                throw new RuntimeException(this.aborted);
            }
            ClientLocalHistory clientLocalHistory = new ClientLocalHistory(this, localHistoryIdentifier);
            LOG.debug("{}: creating a new local history {}", persistenceId(), clientLocalHistory);
            Verify.verify(this.histories.put(localHistoryIdentifier, clientLocalHistory) == null);
            this.lock.unlockRead(readLock);
            return clientLocalHistory;
        } catch (Throwable th) {
            this.lock.unlockRead(readLock);
            throw th;
        }
    }

    @Override // org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient
    public final ClientTransaction createTransaction() {
        return this.singleHistory.createTransaction();
    }

    @Override // org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient
    public final ClientSnapshot createSnapshot() {
        return this.singleHistory.takeSnapshot();
    }

    @Override // org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient, java.lang.AutoCloseable
    public void close() {
        super.close();
        context().executeInActor(this::shutdown);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract Long resolveShardForPath(YangInstanceIdentifier yangInstanceIdentifier);

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract Stream<Long> resolveAllShards();

    /* JADX INFO: Access modifiers changed from: package-private */
    public final ActorUtils actorUtils() {
        return ((AbstractShardBackendResolver) resolver()).actorUtils();
    }
}
