package io.pravega.controller.store.stream;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.BitConverter;
import io.pravega.controller.store.stream.CreateStreamResponse;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.tables.ActiveTxnRecord;
import io.pravega.controller.store.stream.tables.Cache;
import io.pravega.controller.store.stream.tables.CompletedTxnRecord;
import io.pravega.controller.store.stream.tables.Data;
import io.pravega.controller.store.stream.tables.State;
import io.pravega.controller.store.stream.tables.StreamTruncationRecord;
import io.pravega.controller.store.stream.tables.TableHelper;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.curator.utils.ZKPaths;

/* loaded from: input_file:io/pravega/controller/store/stream/ZKStream.class */
class ZKStream extends PersistentStreamBase<Integer> {
    private static final String SCOPE_PATH = "/store/%s";
    private static final String STREAM_PATH = "/store/%s/%s";
    private static final String CREATION_TIME_PATH = "/store/%s/%s/creationTime";
    private static final String CONFIGURATION_PATH = "/store/%s/%s/configuration";
    private static final String TRUNCATION_PATH = "/store/%s/%s/truncation";
    private static final String STATE_PATH = "/store/%s/%s/state";
    private static final String SEGMENT_PATH = "/store/%s/%s/segment";
    private static final String HISTORY_PATH = "/store/%s/%s/history";
    private static final String INDEX_PATH = "/store/%s/%s/index";
    private static final String RETENTION_PATH = "/store/%s/%s/retention";
    private static final String MARKER_PATH = "/store/%s/%s/markers";
    private final ZKStoreHelper store;
    private final String creationPath;
    private final String configurationPath;
    private final String truncationPath;
    private final String statePath;
    private final String segmentPath;
    private final String historyPath;
    private final String indexPath;
    private final String retentionPath;
    private final String activeTxRoot;
    private final String completedTxPath;
    private final String markerPath;
    private final String scopePath;
    private final String streamPath;
    private final Cache<Integer> cache;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ZKStream(String str, String str2, ZKStoreHelper zKStoreHelper) {
        super(str, str2);
        this.store = zKStoreHelper;
        this.scopePath = String.format(SCOPE_PATH, str);
        this.streamPath = String.format(STREAM_PATH, str, str2);
        this.creationPath = String.format(CREATION_TIME_PATH, str, str2);
        this.configurationPath = String.format(CONFIGURATION_PATH, str, str2);
        this.truncationPath = String.format(TRUNCATION_PATH, str, str2);
        this.statePath = String.format(STATE_PATH, str, str2);
        this.segmentPath = String.format(SEGMENT_PATH, str, str2);
        this.historyPath = String.format(HISTORY_PATH, str, str2);
        this.indexPath = String.format(INDEX_PATH, str, str2);
        this.retentionPath = String.format(RETENTION_PATH, str, str2);
        this.activeTxRoot = String.format("/transactions/activeTx/%s/%s", str, str2);
        this.completedTxPath = String.format("/transactions/completedTx/%s/%s", str, str2);
        this.markerPath = String.format(MARKER_PATH, str, str2);
        ZKStoreHelper zKStoreHelper2 = this.store;
        zKStoreHelper2.getClass();
        this.cache = new Cache<>(zKStoreHelper2::getData);
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Integer> getNumberOfOngoingTransactions() {
        return this.store.getChildren(this.activeTxRoot).thenCompose(list -> {
            return Futures.allOfWithResults((List) list.stream().map(str -> {
                return getNumberOfOngoingTransactions(Integer.parseInt(str));
            }).collect(Collectors.toList()));
        }).thenApply((Function<? super U, ? extends U>) list2 -> {
            return (Integer) list2.stream().reduce(0, (v0, v1) -> {
                return Integer.sum(v0, v1);
            });
        });
    }

    private CompletableFuture<Integer> getNumberOfOngoingTransactions(int i) {
        return this.store.getChildren(getEpochPath(i)).thenApply((v0) -> {
            return v0.size();
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public void refresh() {
        this.cache.invalidateAll();
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> deleteStream() {
        return this.store.deleteTree(this.streamPath);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<CreateStreamResponse> checkStreamExists(StreamConfiguration streamConfiguration, long j) {
        return this.store.checkExists(this.creationPath).thenCompose(bool -> {
            return !bool.booleanValue() ? CompletableFuture.completedFuture(new CreateStreamResponse(CreateStreamResponse.CreateStatus.NEW, streamConfiguration, j)) : getCreationTime().thenCompose(l -> {
                return this.store.checkExists(this.configurationPath).thenCompose(bool -> {
                    if (bool.booleanValue()) {
                        return handleConfigExists(l.longValue(), l.longValue() == j);
                    }
                    return CompletableFuture.completedFuture(new CreateStreamResponse(CreateStreamResponse.CreateStatus.NEW, streamConfiguration, l.longValue()));
                });
            });
        });
    }

    private CompletableFuture<CreateStreamResponse> handleConfigExists(long j, boolean z) {
        CreateStreamResponse.CreateStatus createStatus = z ? CreateStreamResponse.CreateStatus.NEW : CreateStreamResponse.CreateStatus.EXISTS_CREATING;
        return getConfiguration().thenCompose(streamConfiguration -> {
            return this.store.checkExists(this.statePath).thenCompose(bool -> {
                return !bool.booleanValue() ? CompletableFuture.completedFuture(new CreateStreamResponse(createStatus, streamConfiguration, j)) : getState(false).thenApply(state -> {
                    return (state.equals(State.UNKNOWN) || state.equals(State.CREATING)) ? new CreateStreamResponse(createStatus, streamConfiguration, j) : new CreateStreamResponse(CreateStreamResponse.CreateStatus.EXISTS_ACTIVE, streamConfiguration, j);
                });
            });
        });
    }

    private CompletableFuture<Long> getCreationTime() {
        return this.cache.getCachedData(this.creationPath).thenApply(data -> {
            return Long.valueOf(BitConverter.readLong(data.getData(), 0));
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> checkScopeExists() {
        return this.store.checkExists(this.scopePath).thenAccept(bool -> {
            if (!bool.booleanValue()) {
                throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, this.scopePath);
            }
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createRetentionSet(byte[] bArr) {
        return this.store.createZNodeIfNotExist(this.retentionPath, bArr);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Data<Integer>> getRetentionSet() {
        return this.store.getData(this.retentionPath);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> updateRetentionSet(Data<Integer> data) {
        return this.store.setData(this.retentionPath, data);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> storeCreationTimeIfAbsent(long j) {
        byte[] bArr = new byte[8];
        BitConverter.writeLong(bArr, 0, j);
        return this.store.createZNodeIfNotExist(this.creationPath, bArr).thenApply(r4 -> {
            return this.cache.invalidateCache(this.creationPath);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> createConfigurationIfAbsent(StreamProperty<StreamConfiguration> streamProperty) {
        return this.store.createZNodeIfNotExist(this.configurationPath, SerializationUtils.serialize(streamProperty)).thenApply(r4 -> {
            return this.cache.invalidateCache(this.configurationPath);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> createStateIfAbsent(State state) {
        return this.store.createZNodeIfNotExist(this.statePath, SerializationUtils.serialize(state)).thenApply(r4 -> {
            return this.cache.invalidateCache(this.statePath);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> createSegmentTableIfAbsent(Data<Integer> data) {
        return this.store.createZNodeIfNotExist(this.segmentPath, data.getData()).thenApply(r4 -> {
            return this.cache.invalidateCache(this.segmentPath);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> createIndexTableIfAbsent(Data<Integer> data) {
        return this.store.createZNodeIfNotExist(this.indexPath, data.getData()).thenApply(r4 -> {
            return this.cache.invalidateCache(this.indexPath);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> createHistoryTableIfAbsent(Data<Integer> data) {
        return this.store.createZNodeIfNotExist(this.historyPath, data.getData()).thenApply(r4 -> {
            return this.cache.invalidateCache(this.historyPath);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> updateHistoryTable(Data<Integer> data) {
        return this.store.setData(this.historyPath, data).whenComplete((r4, th) -> {
            this.cache.invalidateCache(this.historyPath);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> createMarkerData(int i, long j) {
        String makePath = ZKPaths.makePath(this.markerPath, String.format("%d", Integer.valueOf(i)));
        byte[] bArr = new byte[8];
        BitConverter.writeLong(bArr, 0, j);
        return this.store.createZNodeIfNotExist(makePath, bArr).thenAccept(r4 -> {
            this.cache.invalidateCache(this.markerPath);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> updateMarkerData(int i, Data<Integer> data) {
        String makePath = ZKPaths.makePath(this.markerPath, String.format("%d", Integer.valueOf(i)));
        return this.store.setData(makePath, data).whenComplete((r5, th) -> {
            this.cache.invalidateCache(makePath);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Data<Integer>> getMarkerData(int i) {
        CompletableFuture<Data<Integer>> completableFuture = new CompletableFuture<>();
        this.cache.getCachedData(ZKPaths.makePath(this.markerPath, String.format("%d", Integer.valueOf(i)))).whenComplete((data, th) -> {
            if (th == null) {
                completableFuture.complete(data);
                return;
            }
            Throwable unwrap = Exceptions.unwrap(th);
            if (unwrap instanceof StoreException.DataNotFoundException) {
                completableFuture.complete(null);
            } else {
                completableFuture.completeExceptionally(unwrap);
            }
        });
        return completableFuture;
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> removeMarkerData(int i) {
        String makePath = ZKPaths.makePath(this.markerPath, String.format("%d", Integer.valueOf(i)));
        return this.store.deletePath(makePath, false).whenComplete((r5, th) -> {
            this.cache.invalidateCache(makePath);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Map<String, Data<Integer>>> getCurrentTxns() {
        return getActiveEpoch(false).thenCompose(pair -> {
            return this.store.getChildren(getEpochPath(((Integer) pair.getKey()).intValue())).thenCompose(list -> {
                return Futures.allOfWithResults((Map) list.stream().collect(Collectors.toMap(str -> {
                    return str;
                }, str2 -> {
                    return this.cache.getCachedData(getActiveTxPath(((Integer) pair.getKey()).intValue(), str2));
                })));
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Integer> createNewTransaction(UUID uuid, long j, long j2, long j3, long j4) {
        CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
        createNewTransactionNode(uuid, j, j2, j3, j4).whenComplete((num, th) -> {
            if (th == null) {
                completableFuture.complete(num);
            } else if (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException) {
                Futures.completeAfter(() -> {
                    return createNewTransactionNode(uuid, j, j2, j3, j4);
                }, completableFuture);
            } else {
                completableFuture.completeExceptionally(th);
            }
        });
        return completableFuture;
    }

    private CompletableFuture<Integer> createNewTransactionNode(UUID uuid, long j, long j2, long j3, long j4) {
        return getLatestEpoch().thenCompose(pair -> {
            String activeTxPath = getActiveTxPath(((Integer) pair.getKey()).intValue(), uuid.toString());
            return this.store.createZNodeIfNotExist(activeTxPath, new ActiveTxnRecord(j, j2, j3, j4, TxnStatus.OPEN).toByteArray(), false).thenApply(r5 -> {
                return this.cache.invalidateCache(activeTxPath);
            }).thenApply((Function<? super U, ? extends U>) r3 -> {
                return (Integer) pair.getKey();
            });
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Integer> getTransactionEpoch(UUID uuid) {
        return this.store.getChildren(this.activeTxRoot).thenCompose(list -> {
            HashMap hashMap = new HashMap();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                hashMap.put((String) it.next(), this.store.checkExists(getActiveTxPath(Integer.parseInt(r0), uuid.toString())));
            }
            return Futures.allOfWithResults(hashMap);
        }).thenApply((Function<? super U, ? extends U>) map -> {
            Optional findFirst = map.entrySet().stream().filter((v0) -> {
                return v0.getValue();
            }).findFirst();
            if (findFirst.isPresent()) {
                return Integer.valueOf(Integer.parseInt((String) ((Map.Entry) findFirst.get()).getKey()));
            }
            throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Stream: " + getName() + " Transaction: " + uuid.toString());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Data<Integer>> getActiveTx(int i, UUID uuid) {
        return this.store.getData(getActiveTxPath(i, uuid.toString()));
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> updateActiveTx(int i, UUID uuid, Data<Integer> data) {
        String activeTxPath = getActiveTxPath(i, uuid.toString());
        return this.store.setData(activeTxPath, data).whenComplete((r5, th) -> {
            this.cache.invalidateCache(activeTxPath);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> sealActiveTx(int i, UUID uuid, boolean z, ActiveTxnRecord activeTxnRecord, int i2) {
        String activeTxPath = getActiveTxPath(i, uuid.toString());
        return this.store.setData(activeTxPath, new Data<>(new ActiveTxnRecord(activeTxnRecord.getTxCreationTimestamp(), activeTxnRecord.getLeaseExpiryTime(), activeTxnRecord.getMaxExecutionExpiryTime(), activeTxnRecord.getScaleGracePeriod(), z ? TxnStatus.COMMITTING : TxnStatus.ABORTING).toByteArray(), Integer.valueOf(i2))).thenApply(r5 -> {
            return this.cache.invalidateCache(activeTxPath);
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (r52, th) -> {
            this.cache.invalidateCache(activeTxPath);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Data<Integer>> getCompletedTx(UUID uuid) {
        return this.cache.getCachedData(getCompletedTxPath(uuid.toString()));
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> removeActiveTxEntry(int i, UUID uuid) {
        String activeTxPath = getActiveTxPath(i, uuid.toString());
        return this.store.checkExists(activeTxPath).thenCompose(bool -> {
            return bool.booleanValue() ? this.store.deletePath(activeTxPath, false).whenComplete((r5, th) -> {
                this.cache.invalidateCache(activeTxPath);
            }) : CompletableFuture.completedFuture(null);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createCompletedTxEntry(UUID uuid, TxnStatus txnStatus, long j) {
        String completedTxPath = getCompletedTxPath(uuid.toString());
        return this.store.createZNodeIfNotExist(completedTxPath, new CompletedTxnRecord(j, txnStatus).toByteArray()).whenComplete((r5, th) -> {
            this.cache.invalidateCache(completedTxPath);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Void> createTruncationDataIfAbsent(StreamProperty<StreamTruncationRecord> streamProperty) {
        return this.store.createZNodeIfNotExist(this.truncationPath, SerializationUtils.serialize(streamProperty)).thenApply(r4 -> {
            return this.cache.invalidateCache(this.truncationPath);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> setTruncationData(Data<Integer> data) {
        return this.store.setData(this.truncationPath, data).whenComplete((r4, th) -> {
            this.cache.invalidateCache(this.truncationPath);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Data<Integer>> getTruncationData(boolean z) {
        if (z) {
            this.cache.invalidateCache(this.truncationPath);
        }
        return this.cache.getCachedData(this.truncationPath);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> setConfigurationData(Data<Integer> data) {
        return this.store.setData(this.configurationPath, data).whenComplete((r4, th) -> {
            this.cache.invalidateCache(this.configurationPath);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Data<Integer>> getConfigurationData(boolean z) {
        if (z) {
            this.cache.invalidateCache(this.configurationPath);
        }
        return this.cache.getCachedData(this.configurationPath);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> setStateData(Data<Integer> data) {
        return this.store.setData(this.statePath, data).whenComplete((r4, th) -> {
            this.cache.invalidateCache(this.statePath);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Data<Integer>> getStateData(boolean z) {
        if (z) {
            this.cache.invalidateCache(this.statePath);
        }
        return this.cache.getCachedData(this.statePath);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Segment> getSegmentRow(int i) {
        return getSegmentTable().thenApply(data -> {
            return TableHelper.getSegment(i, data.getData());
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Data<Integer>> getSegmentTable() {
        return this.cache.getCachedData(this.segmentPath);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Data<Integer>> getSegmentTableFromStore() {
        this.cache.invalidateCache(this.segmentPath);
        return getSegmentTable();
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> setSegmentTable(Data<Integer> data) {
        return this.store.setData(this.segmentPath, data).whenComplete((r4, th) -> {
            this.cache.invalidateCache(this.segmentPath);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Data<Integer>> getHistoryTable() {
        return this.cache.getCachedData(this.historyPath);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Data<Integer>> getHistoryTableFromStore() {
        this.cache.invalidateCache(this.historyPath);
        return getHistoryTable();
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> createEpochNodeIfAbsent(int i) {
        return this.store.createZNodeIfNotExist(getEpochPath(i));
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> deleteEpochNode(int i) {
        String epochPath = getEpochPath(i);
        return this.store.deletePath(epochPath, false).thenAccept(r5 -> {
            this.cache.invalidateCache(epochPath);
        });
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    public CompletableFuture<Data<Integer>> getIndexTable() {
        return this.cache.getCachedData(this.indexPath);
    }

    @Override // io.pravega.controller.store.stream.PersistentStreamBase
    CompletableFuture<Void> updateIndexTable(Data<Integer> data) {
        return this.store.setData(this.indexPath, data).whenComplete((r4, th) -> {
            this.cache.invalidateCache(this.indexPath);
        });
    }

    private String getActiveTxPath(long j, String str) {
        return ZKPaths.makePath(ZKPaths.makePath(this.activeTxRoot, Long.toString(j)), str);
    }

    private String getEpochPath(long j) {
        return ZKPaths.makePath(this.activeTxRoot, Long.toString(j));
    }

    private String getCompletedTxPath(String str) {
        return ZKPaths.makePath(this.completedTxPath, str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @SuppressFBWarnings(justification = "generated code")
    public String getStreamPath() {
        return this.streamPath;
    }
}
