package io.pravega.controller.store.checkpoint;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.Position;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.impl.JavaSerializer;
import io.pravega.common.Exceptions;
import io.pravega.common.util.Retry;
import io.pravega.controller.store.checkpoint.CheckpointStoreException;
import java.beans.ConstructorProperties;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.BackgroundPathAndBytesable;
import org.apache.curator.framework.api.WatchPathable;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/store/checkpoint/ZKCheckpointStore.class */
class ZKCheckpointStore implements CheckpointStore {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(ZKCheckpointStore.class);
    private static final String ROOT = "eventProcessors";
    private final CuratorFramework client;
    private final Serializer<Position> positionSerializer = new Serializer<Position>() { // from class: io.pravega.controller.store.checkpoint.ZKCheckpointStore.1
        public ByteBuffer serialize(Position position) {
            return position.toBytes();
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public Position m74deserialize(ByteBuffer byteBuffer) {
            return Position.fromBytes(byteBuffer);
        }
    };
    private final JavaSerializer<ReaderGroupData> groupDataSerializer = new JavaSerializer<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/controller/store/checkpoint/ZKCheckpointStore$ReaderGroupData.class */
    public static class ReaderGroupData implements Serializable {
        private final State state;
        private final List<String> readerIds;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:io/pravega/controller/store/checkpoint/ZKCheckpointStore$ReaderGroupData$State.class */
        public enum State {
            Active,
            Sealed
        }

        @SuppressFBWarnings(justification = "generated code")
        public State getState() {
            return this.state;
        }

        @SuppressFBWarnings(justification = "generated code")
        public List<String> getReaderIds() {
            return this.readerIds;
        }

        @SuppressFBWarnings(justification = "generated code")
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof ReaderGroupData)) {
                return false;
            }
            ReaderGroupData readerGroupData = (ReaderGroupData) obj;
            if (!readerGroupData.canEqual(this)) {
                return false;
            }
            State state = getState();
            State state2 = readerGroupData.getState();
            if (state == null) {
                if (state2 != null) {
                    return false;
                }
            } else if (!state.equals(state2)) {
                return false;
            }
            List<String> readerIds = getReaderIds();
            List<String> readerIds2 = readerGroupData.getReaderIds();
            return readerIds == null ? readerIds2 == null : readerIds.equals(readerIds2);
        }

        @SuppressFBWarnings(justification = "generated code")
        protected boolean canEqual(Object obj) {
            return obj instanceof ReaderGroupData;
        }

        @SuppressFBWarnings(justification = "generated code")
        public int hashCode() {
            State state = getState();
            int hashCode = (1 * 59) + (state == null ? 43 : state.hashCode());
            List<String> readerIds = getReaderIds();
            return (hashCode * 59) + (readerIds == null ? 43 : readerIds.hashCode());
        }

        @SuppressFBWarnings(justification = "generated code")
        public String toString() {
            return "ZKCheckpointStore.ReaderGroupData(state=" + getState() + ", readerIds=" + getReaderIds() + ")";
        }

        @SuppressFBWarnings(justification = "generated code")
        @ConstructorProperties({"state", "readerIds"})
        public ReaderGroupData(State state, List<String> list) {
            this.state = state;
            this.readerIds = list;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ZKCheckpointStore(CuratorFramework curatorFramework) {
        this.client = curatorFramework;
    }

    @Override // io.pravega.controller.store.checkpoint.CheckpointStore
    public void setPosition(String str, String str2, String str3, Position position) throws CheckpointStoreException {
        updateNode(getReaderPath(str, str2, str3), this.positionSerializer.serialize(position).array());
    }

    @Override // io.pravega.controller.store.checkpoint.CheckpointStore
    public Map<String, Position> getPositions(String str, String str2) throws CheckpointStoreException {
        HashMap hashMap = new HashMap();
        String readerGroupPath = getReaderGroupPath(str, str2);
        for (String str3 : getChildren(readerGroupPath)) {
            Position position = null;
            byte[] data = getData(readerGroupPath + "/" + str3);
            if (data != null && data.length > 0) {
                position = (Position) this.positionSerializer.deserialize(ByteBuffer.wrap(data));
            }
            hashMap.put(str3, position);
        }
        return hashMap;
    }

    @Override // io.pravega.controller.store.checkpoint.CheckpointStore
    public void addReaderGroup(String str, String str2) throws CheckpointStoreException {
        addNode(getReaderGroupPath(str, str2), this.groupDataSerializer.serialize(new ReaderGroupData(ReaderGroupData.State.Active, new ArrayList())).array());
    }

    @Override // io.pravega.controller.store.checkpoint.CheckpointStore
    public Map<String, Position> sealReaderGroup(String str, String str2) throws CheckpointStoreException {
        try {
            updateReaderGroupData(getReaderGroupPath(str, str2), readerGroupData -> {
                return new ReaderGroupData(ReaderGroupData.State.Sealed, readerGroupData.getReaderIds());
            });
            return getPositions(str, str2);
        } catch (Exception e) {
            throw new CheckpointStoreException(e);
        } catch (KeeperException.NoNodeException e2) {
            throw new CheckpointStoreException(CheckpointStoreException.Type.NoNode, (Throwable) e2);
        } catch (KeeperException.ConnectionLossException | KeeperException.OperationTimeoutException | KeeperException.SessionExpiredException e3) {
            throw new CheckpointStoreException(CheckpointStoreException.Type.Connectivity, (Throwable) e3);
        }
    }

    @Override // io.pravega.controller.store.checkpoint.CheckpointStore
    public void removeReaderGroup(String str, String str2) throws CheckpointStoreException {
        String readerGroupPath = getReaderGroupPath(str, str2);
        try {
            ReaderGroupData readerGroupData = (ReaderGroupData) this.groupDataSerializer.deserialize(ByteBuffer.wrap(getData(readerGroupPath)));
            if (readerGroupData.getState() == ReaderGroupData.State.Active) {
                throw new CheckpointStoreException(CheckpointStoreException.Type.Active, "ReaderGroup is active.");
            }
            if (!readerGroupData.getReaderIds().isEmpty()) {
                throw new CheckpointStoreException(CheckpointStoreException.Type.NodeNotEmpty, "ReaderGroup is not empty.");
            }
            removeEmptyNode(readerGroupPath);
        } catch (CheckpointStoreException e) {
            if (!e.getType().equals(CheckpointStoreException.Type.NoNode)) {
                throw e;
            }
        }
    }

    @Override // io.pravega.controller.store.checkpoint.CheckpointStore
    public List<String> getReaderGroups(String str) throws CheckpointStoreException {
        return getChildren(getProcessPath(str));
    }

    @Override // io.pravega.controller.store.checkpoint.CheckpointStore
    public void addReader(String str, String str2, String str3) throws CheckpointStoreException {
        try {
            updateReaderGroupData(getReaderGroupPath(str, str2), readerGroupData -> {
                if (readerGroupData.getState() == ReaderGroupData.State.Sealed) {
                    throw Exceptions.sneakyThrow(new CheckpointStoreException(CheckpointStoreException.Type.Sealed, "ReaderGroup is sealed"));
                }
                List<String> readerIds = readerGroupData.getReaderIds();
                if (readerIds.contains(str3)) {
                    throw Exceptions.sneakyThrow(new CheckpointStoreException(CheckpointStoreException.Type.NodeExists, "Duplicate readerId"));
                }
                readerIds.add(str3);
                return new ReaderGroupData(readerGroupData.getState(), readerIds);
            });
            addNode(getReaderPath(str, str2, str3));
        } catch (CheckpointStoreException e) {
            throw e;
        } catch (KeeperException.NoNodeException e2) {
            throw new CheckpointStoreException(CheckpointStoreException.Type.NoNode, (Throwable) e2);
        } catch (Exception e3) {
            throw new CheckpointStoreException(e3);
        } catch (KeeperException.ConnectionLossException | KeeperException.OperationTimeoutException | KeeperException.SessionExpiredException e4) {
            throw new CheckpointStoreException(CheckpointStoreException.Type.Connectivity, (Throwable) e4);
        }
    }

    @Override // io.pravega.controller.store.checkpoint.CheckpointStore
    public void removeReader(String str, String str2, String str3) throws CheckpointStoreException {
        String readerGroupPath = getReaderGroupPath(str, str2);
        try {
            removeEmptyNode(getReaderPath(str, str2, str3));
            updateReaderGroupData(readerGroupPath, readerGroupData -> {
                List<String> readerIds = readerGroupData.getReaderIds();
                if (!readerIds.contains(str3)) {
                    return readerGroupData;
                }
                readerIds.remove(str3);
                return new ReaderGroupData(readerGroupData.getState(), readerIds);
            });
        } catch (CheckpointStoreException e) {
            throw e;
        } catch (KeeperException.NoNodeException e2) {
            throw new CheckpointStoreException(CheckpointStoreException.Type.NoNode, (Throwable) e2);
        } catch (Exception e3) {
            throw new CheckpointStoreException(e3);
        } catch (KeeperException.ConnectionLossException | KeeperException.OperationTimeoutException | KeeperException.SessionExpiredException e4) {
            throw new CheckpointStoreException(CheckpointStoreException.Type.Connectivity, (Throwable) e4);
        }
    }

    @Override // io.pravega.controller.store.checkpoint.CheckpointStore
    public Set<String> getProcesses() throws CheckpointStoreException {
        return (Set) getChildren(getRootPath()).stream().collect(Collectors.toSet());
    }

    private String getReaderPath(String str, String str2, String str3) {
        return String.format("/%s/%s/%s/%s", ROOT, str, str2, str3);
    }

    private String getReaderGroupPath(String str, String str2) {
        return String.format("/%s/%s/%s", ROOT, str, str2);
    }

    private String getProcessPath(String str) {
        return String.format("/%s/%s", ROOT, str);
    }

    private String getRootPath() {
        return String.format("/%s", ROOT);
    }

    private void updateReaderGroupData(String str, Function<ReaderGroupData, ReaderGroupData> function) throws Exception {
        Stat stat = new Stat();
        Retry.withExpBackoff(100L, 2, 10, 2000L).retryingOn(KeeperException.BadVersionException.class).throwingOn(Exception.class).run(() -> {
            ((BackgroundPathAndBytesable) this.client.setData().withVersion(stat.getVersion())).forPath(str, this.groupDataSerializer.serialize((ReaderGroupData) function.apply((ReaderGroupData) this.groupDataSerializer.deserialize(ByteBuffer.wrap((byte[]) ((WatchPathable) this.client.getData().storingStatIn(stat)).forPath(str))))).array());
            return null;
        });
    }

    private void addNode(String str) throws CheckpointStoreException {
        addNode(str, new byte[0]);
    }

    private void addNode(String str, byte[] bArr) throws CheckpointStoreException {
        try {
            ((ACLBackgroundPathAndBytesable) this.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)).forPath(str, bArr);
        } catch (KeeperException.ConnectionLossException | KeeperException.OperationTimeoutException | KeeperException.SessionExpiredException e) {
            throw new CheckpointStoreException(CheckpointStoreException.Type.Connectivity, (Throwable) e);
        } catch (Exception e2) {
            throw new CheckpointStoreException(e2);
        } catch (KeeperException.NodeExistsException e3) {
            throw new CheckpointStoreException(CheckpointStoreException.Type.NodeExists, (Throwable) e3);
        }
    }

    private void removeEmptyNode(String str) throws CheckpointStoreException {
        try {
            this.client.delete().forPath(str);
        } catch (KeeperException.NotEmptyException e) {
            throw new CheckpointStoreException(CheckpointStoreException.Type.NodeNotEmpty, (Throwable) e);
        } catch (KeeperException.NoNodeException e2) {
        } catch (KeeperException.ConnectionLossException | KeeperException.OperationTimeoutException | KeeperException.SessionExpiredException e3) {
            throw new CheckpointStoreException(CheckpointStoreException.Type.Connectivity, (Throwable) e3);
        } catch (Exception e4) {
            throw new CheckpointStoreException(e4);
        }
    }

    private void updateNode(String str, byte[] bArr) throws CheckpointStoreException {
        try {
            this.client.setData().forPath(str, bArr);
        } catch (KeeperException.ConnectionLossException | KeeperException.OperationTimeoutException | KeeperException.SessionExpiredException e) {
            throw new CheckpointStoreException(CheckpointStoreException.Type.Connectivity, (Throwable) e);
        } catch (Exception e2) {
            throw new CheckpointStoreException(e2);
        } catch (KeeperException.NoNodeException e3) {
            throw new CheckpointStoreException(CheckpointStoreException.Type.NoNode, (Throwable) e3);
        }
    }

    private List<String> getChildren(String str) throws CheckpointStoreException {
        try {
            return (List) this.client.getChildren().forPath(str);
        } catch (Exception e) {
            throw new CheckpointStoreException(e);
        } catch (KeeperException.ConnectionLossException | KeeperException.OperationTimeoutException | KeeperException.SessionExpiredException e2) {
            throw new CheckpointStoreException(CheckpointStoreException.Type.Connectivity, (Throwable) e2);
        } catch (KeeperException.NoNodeException e3) {
            return Collections.emptyList();
        }
    }

    private byte[] getData(String str) throws CheckpointStoreException {
        try {
            return (byte[]) this.client.getData().forPath(str);
        } catch (KeeperException.ConnectionLossException | KeeperException.OperationTimeoutException | KeeperException.SessionExpiredException e) {
            throw new CheckpointStoreException(CheckpointStoreException.Type.Connectivity, (Throwable) e);
        } catch (Exception e2) {
            throw new CheckpointStoreException(e2);
        } catch (KeeperException.NoNodeException e3) {
            throw new CheckpointStoreException(CheckpointStoreException.Type.NoNode, (Throwable) e3);
        }
    }
}
