package io.pravega.controller.store.stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.UnmodifiableIterator;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.CollectionHelpers;
import io.pravega.controller.metrics.TransactionMetrics;
import io.pravega.controller.server.rest.generated.api.ApiResponseMessage;
import io.pravega.controller.store.Version;
import io.pravega.controller.store.VersionedMetadata;
import io.pravega.controller.store.stream.EpochTransitionOperationExceptions;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.records.ActiveTxnRecord;
import io.pravega.controller.store.stream.records.CommittingTransactionsRecord;
import io.pravega.controller.store.stream.records.CompletedTxnRecord;
import io.pravega.controller.store.stream.records.EpochRecord;
import io.pravega.controller.store.stream.records.EpochTransitionRecord;
import io.pravega.controller.store.stream.records.HistoryTimeSeries;
import io.pravega.controller.store.stream.records.HistoryTimeSeriesRecord;
import io.pravega.controller.store.stream.records.RecordHelper;
import io.pravega.controller.store.stream.records.RetentionSet;
import io.pravega.controller.store.stream.records.SealedSegmentsMapShard;
import io.pravega.controller.store.stream.records.StateRecord;
import io.pravega.controller.store.stream.records.StreamConfigurationRecord;
import io.pravega.controller.store.stream.records.StreamCutRecord;
import io.pravega.controller.store.stream.records.StreamCutReferenceRecord;
import io.pravega.controller.store.stream.records.StreamSegmentRecord;
import io.pravega.controller.store.stream.records.StreamTruncationRecord;
import io.pravega.controller.store.stream.records.WriterMark;
import io.pravega.shared.NameUtils;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/store/stream/PersistentStreamBase.class */
public abstract class PersistentStreamBase implements Stream {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log;
    private final String scope;
    private final String name;
    private final AtomicInteger historyChunkSize;
    private final AtomicInteger shardSize;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.pravega.controller.store.stream.PersistentStreamBase$1, reason: invalid class name */
    /* loaded from: input_file:io/pravega/controller/store/stream/PersistentStreamBase$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$pravega$controller$store$stream$TxnStatus = new int[TxnStatus.values().length];

        static {
            try {
                $SwitchMap$io$pravega$controller$store$stream$TxnStatus[TxnStatus.COMMITTING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$TxnStatus[TxnStatus.OPEN.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$TxnStatus[TxnStatus.COMMITTED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$TxnStatus[TxnStatus.ABORTING.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$TxnStatus[TxnStatus.ABORTED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$TxnStatus[TxnStatus.UNKNOWN.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PersistentStreamBase(String str, String str2, int i, int i2) {
        this.scope = str;
        this.name = str2;
        this.historyChunkSize = new AtomicInteger(i);
        this.shardSize = new AtomicInteger(i2);
    }

    @Override // io.pravega.controller.store.stream.Stream
    public String getScope() {
        return this.scope;
    }

    @Override // io.pravega.controller.store.stream.Stream
    public String getName() {
        return this.name;
    }

    @Override // io.pravega.controller.store.stream.Stream
    public String getScopeName() {
        return this.scope;
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<CreateStreamResponse> create(StreamConfiguration streamConfiguration, long j, int i) {
        return checkStreamExists(streamConfiguration, j, i).thenCompose(createStreamResponse -> {
            return createStreamMetadata().thenCompose(r6 -> {
                return storeCreationTimeIfAbsent(createStreamResponse.getTimestamp());
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r7 -> {
                return createConfigurationIfAbsent(StreamConfigurationRecord.complete(this.scope, this.name, createStreamResponse.getConfiguration()));
            }).thenCompose(r4 -> {
                return createEpochTransitionIfAbsent(EpochTransitionRecord.EMPTY);
            }).thenCompose(r42 -> {
                return createTruncationDataIfAbsent(StreamTruncationRecord.EMPTY);
            }).thenCompose(r43 -> {
                return createCommitTxnRecordIfAbsent(CommittingTransactionsRecord.EMPTY);
            }).thenCompose(r5 -> {
                return createStateIfAbsent(StateRecord.builder().state(State.CREATING).m182build());
            }).thenCompose(r72 -> {
                return createHistoryRecords(i, createStreamResponse);
            }).thenApply(r3 -> {
                return createStreamResponse;
            });
        });
    }

    private CompletionStage<Void> createHistoryRecords(int i, CreateStreamResponse createStreamResponse) {
        int minNumSegments = createStreamResponse.getConfiguration().getScalingPolicy().getMinNumSegments();
        double d = 1.0d / minNumSegments;
        long timestamp = createStreamResponse.getTimestamp();
        ImmutableList.Builder builder = ImmutableList.builder();
        IntStream.range(0, minNumSegments).boxed().forEach(num -> {
            builder.add(newSegmentRecord(0, i + num.intValue(), timestamp, Double.valueOf(num.intValue() * d), Double.valueOf((num.intValue() + 1) * d)));
        });
        EpochRecord epochRecord = new EpochRecord(0, 0, builder.build(), timestamp);
        return createEpochRecord(epochRecord).thenCompose(r5 -> {
            return createHistoryChunk(epochRecord);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r4 -> {
            return createSealedSegmentSizeMapShardIfAbsent(0);
        }).thenCompose(r6 -> {
            return createRetentionSetDataIfAbsent(new RetentionSet(ImmutableList.of()));
        }).thenCompose(r52 -> {
            return createCurrentEpochRecordDataIfAbsent(epochRecord);
        });
    }

    private CompletionStage<Void> createHistoryChunk(EpochRecord epochRecord) {
        return createHistoryTimeSeriesChunk(0, new HistoryTimeSeriesRecord(0, 0, ImmutableList.of(), epochRecord.getSegments(), epochRecord.getCreationTime()));
    }

    private CompletableFuture<Void> createHistoryTimeSeriesChunk(int i, HistoryTimeSeriesRecord historyTimeSeriesRecord) {
        return createHistoryTimeSeriesChunkDataIfAbsent(i, new HistoryTimeSeries(ImmutableList.builder().add(historyTimeSeriesRecord).build()));
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> delete() {
        return deleteStream();
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> startTruncation(Map<Long, Long> map) {
        return getTruncationRecord().thenCompose(versionedMetadata -> {
            Preconditions.checkNotNull(versionedMetadata);
            Preconditions.checkArgument(!((StreamTruncationRecord) versionedMetadata.getObject()).isUpdating());
            return isStreamCutValid(map).thenCompose(bool -> {
                Exceptions.checkArgument(bool.booleanValue(), "streamCut", "invalid stream cut", new Object[0]);
                return computeStreamCutSpan(map).thenCompose(immutableMap -> {
                    StreamTruncationRecord streamTruncationRecord = (StreamTruncationRecord) versionedMetadata.getObject();
                    Exceptions.checkArgument(greaterThan(map, immutableMap, streamTruncationRecord.getStreamCut(), streamTruncationRecord.getSpan()), "StreamCut", "Supplied streamcut is behind previous truncation point", new Object[0]);
                    return computeTruncationRecord(streamTruncationRecord, map, immutableMap).thenCompose(streamTruncationRecord2 -> {
                        return Futures.toVoid(setTruncationData(new VersionedMetadata<>(streamTruncationRecord2, versionedMetadata.getVersion())));
                    });
                });
            });
        });
    }

    private boolean greaterThan(Map<Long, Long> map, Map<StreamSegmentRecord, Integer> map2, Map<Long, Long> map3, Map<StreamSegmentRecord, Integer> map4) {
        return map2.entrySet().stream().allMatch(entry -> {
            return map4.entrySet().stream().noneMatch(entry -> {
                return (((StreamSegmentRecord) entry.getKey()).segmentId() == ((StreamSegmentRecord) entry.getKey()).segmentId() && ((Long) map.get(Long.valueOf(((StreamSegmentRecord) entry.getKey()).segmentId()))).longValue() < ((Long) map3.get(Long.valueOf(((StreamSegmentRecord) entry.getKey()).segmentId()))).longValue()) || (((StreamSegmentRecord) entry.getKey()).overlaps((StreamSegmentRecord) entry.getKey()) && ((Integer) entry.getValue()).intValue() < ((Integer) entry.getValue()).intValue());
            });
        });
    }

    private CompletableFuture<StreamTruncationRecord> computeTruncationRecord(StreamTruncationRecord streamTruncationRecord, Map<Long, Long> map, ImmutableMap<StreamSegmentRecord, Integer> immutableMap) {
        log.debug("computing truncation for stream {}/{}", this.scope, this.name);
        return (streamTruncationRecord.getSpan().isEmpty() ? getEpochRecord(0).thenApply(epochRecord -> {
            return convertToSpan(epochRecord);
        }) : CompletableFuture.completedFuture(streamTruncationRecord.getSpan())).thenCompose(map2 -> {
            return segmentsBetweenStreamCutSpans(map2, immutableMap);
        }).thenCompose(immutableSet -> {
            return sizeBetweenStreamCuts(streamTruncationRecord.getStreamCut(), map, immutableSet).thenApply(l -> {
                ImmutableSet.Builder builder = ImmutableSet.builder();
                java.util.stream.Stream filter = immutableSet.stream().map((v0) -> {
                    return v0.segmentId();
                }).filter(l -> {
                    return !map.containsKey(l);
                });
                builder.getClass();
                filter.forEach((v1) -> {
                    r1.add(v1);
                });
                return new StreamTruncationRecord(ImmutableMap.copyOf(map), immutableMap, streamTruncationRecord.getDeletedSegments(), builder.build(), streamTruncationRecord.getSizeTill() + l.longValue(), true);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> completeTruncation(VersionedMetadata<StreamTruncationRecord> versionedMetadata) {
        Preconditions.checkNotNull(versionedMetadata);
        Preconditions.checkArgument(versionedMetadata.getObject().isUpdating());
        StreamTruncationRecord object = versionedMetadata.getObject();
        return object.isUpdating() ? Futures.toVoid(setTruncationData(new VersionedMetadata<>(StreamTruncationRecord.complete(object), versionedMetadata.getVersion()))) : CompletableFuture.completedFuture(null);
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedMetadata<StreamTruncationRecord>> getTruncationRecord() {
        return getTruncationData(true).thenApply(versionedMetadata -> {
            return new VersionedMetadata((StreamTruncationRecord) versionedMetadata.getObject(), versionedMetadata.getVersion());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> startUpdateConfiguration(StreamConfiguration streamConfiguration) {
        return getVersionedConfigurationRecord().thenCompose(versionedMetadata -> {
            Preconditions.checkArgument(!((StreamConfigurationRecord) versionedMetadata.getObject()).isUpdating());
            return Futures.toVoid(setConfigurationData(new VersionedMetadata<>(StreamConfigurationRecord.update(this.scope, this.name, streamConfiguration), versionedMetadata.getVersion())));
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> completeUpdateConfiguration(VersionedMetadata<StreamConfigurationRecord> versionedMetadata) {
        StreamConfigurationRecord object = versionedMetadata.getObject();
        Preconditions.checkNotNull(object);
        if (!object.isUpdating()) {
            return CompletableFuture.completedFuture(null);
        }
        StreamConfigurationRecord complete = StreamConfigurationRecord.complete(this.scope, this.name, object.getStreamConfiguration());
        log.debug("Completing update configuration for stream {}/{}", this.scope, this.name);
        return Futures.toVoid(setConfigurationData(new VersionedMetadata<>(complete, versionedMetadata.getVersion())));
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<StreamConfiguration> getConfiguration() {
        return getConfigurationData(false).thenApply(versionedMetadata -> {
            return ((StreamConfigurationRecord) versionedMetadata.getObject()).getStreamConfiguration();
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedMetadata<StreamConfigurationRecord>> getVersionedConfigurationRecord() {
        return getConfigurationData(true).thenApply(versionedMetadata -> {
            return new VersionedMetadata(versionedMetadata.getObject(), versionedMetadata.getVersion());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> updateState(State state) {
        return getStateData(true).thenCompose(versionedMetadata -> {
            return Futures.toVoid(updateVersionedState(new VersionedMetadata<>(((StateRecord) versionedMetadata.getObject()).getState(), versionedMetadata.getVersion()), state));
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedMetadata<State>> getVersionedState() {
        return getStateData(true).thenApply(versionedMetadata -> {
            return new VersionedMetadata(((StateRecord) versionedMetadata.getObject()).getState(), versionedMetadata.getVersion());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedMetadata<State>> updateVersionedState(VersionedMetadata<State> versionedMetadata, State state) {
        return State.isTransitionAllowed(versionedMetadata.getObject(), state) ? setStateData(new VersionedMetadata<>(StateRecord.builder().state(state).m182build(), versionedMetadata.getVersion())).thenApply(version -> {
            return new VersionedMetadata(state, version);
        }) : Futures.failedFuture(StoreException.create(StoreException.Type.OPERATION_NOT_ALLOWED, "Stream: " + getName() + " State: " + state.name() + " current state = " + versionedMetadata.getObject()));
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<State> getState(boolean z) {
        return getStateData(z).thenApply(versionedMetadata -> {
            return ((StateRecord) versionedMetadata.getObject()).getState();
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<StreamSegmentRecord> getSegment(long j) {
        return getEpochRecord(NameUtils.getEpoch(j)).thenApply(epochRecord -> {
            return (StreamSegmentRecord) epochRecord.getSegments().stream().filter(streamSegmentRecord -> {
                return streamSegmentRecord.segmentId() == j;
            }).findAny().orElseThrow(() -> {
                return StoreException.create(StoreException.Type.DATA_NOT_FOUND, "segment not found in epoch");
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<List<ScaleMetadata>> getScaleMetadata(long j, long j2) {
        CompletableFuture<Integer> findEpochAtTime = findEpochAtTime(j, false);
        CompletableFuture<Integer> findEpochAtTime2 = findEpochAtTime(j2, false);
        return CompletableFuture.allOf(findEpochAtTime, findEpochAtTime2).thenCompose(r8 -> {
            return fetchEpochs(((Integer) findEpochAtTime.join()).intValue(), ((Integer) findEpochAtTime2.join()).intValue(), false);
        }).thenApply((Function<? super U, ? extends U>) this::mapToScaleMetadata);
    }

    private List<ScaleMetadata> mapToScaleMetadata(List<EpochRecord> list) {
        AtomicReference atomicReference = new AtomicReference();
        return (List) list.stream().map(epochRecord -> {
            long j = 0;
            long j2 = 0;
            ImmutableList<StreamSegmentRecord> segments = epochRecord.getSegments();
            if (atomicReference.get() != null) {
                j = findSegmentSplitsMerges((List) atomicReference.get(), segments);
                j2 = findSegmentSplitsMerges(segments, (List) atomicReference.get());
            }
            atomicReference.set(segments);
            return new ScaleMetadata(epochRecord.getCreationTime(), transform((List<StreamSegmentRecord>) segments), j, j2);
        }).collect(Collectors.toList());
    }

    private long findSegmentSplitsMerges(List<StreamSegmentRecord> list, List<StreamSegmentRecord> list2) {
        return list.stream().filter(streamSegmentRecord -> {
            return list2.stream().filter(streamSegmentRecord -> {
                return streamSegmentRecord.overlaps(streamSegmentRecord);
            }).count() > 1;
        }).count();
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Integer> getSegmentSealedEpoch(long j) {
        return getSegmentSealedRecordData(j).handle((versionedMetadata, th) -> {
            if (th == null) {
                return (Integer) versionedMetadata.getObject();
            }
            if (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException) {
                return -1;
            }
            throw new CompletionException(th);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Set<Long>> getAllSegmentIds() {
        CompletableFuture thenCompose = getTruncationRecord().thenCompose(versionedMetadata -> {
            return ((StreamTruncationRecord) versionedMetadata.getObject()).equals(StreamTruncationRecord.EMPTY) ? getEpochRecord(0).thenApply(this::convertToSpan) : CompletableFuture.completedFuture(((StreamTruncationRecord) versionedMetadata.getObject()).getSpan());
        });
        CompletableFuture thenApply = getActiveEpoch(true).thenApply(this::convertToSpan);
        return CompletableFuture.allOf(thenCompose, thenApply).thenCompose(r7 -> {
            return segmentsBetweenStreamCutSpans((Map) thenCompose.join(), (Map) thenApply.join()).thenApply(immutableSet -> {
                return (Set) immutableSet.stream().map((v0) -> {
                    return v0.segmentId();
                }).collect(Collectors.toSet());
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Map<StreamSegmentRecord, List<Long>>> getSuccessorsWithPredecessors(long j) {
        return getSegmentSealedEpoch(j).thenCompose(num -> {
            if (num.intValue() < 0) {
                return getActiveEpoch(true).thenApply(epochRecord -> {
                    return Collections.emptyMap();
                });
            }
            CompletableFuture<EpochRecord> epochRecord2 = getEpochRecord(num.intValue());
            CompletableFuture<EpochRecord> epochRecord3 = getEpochRecord(num.intValue() - 1);
            return CompletableFuture.allOf(epochRecord2, epochRecord3).thenApply(r8 -> {
                EpochRecord epochRecord4 = (EpochRecord) epochRecord2.join();
                EpochRecord epochRecord5 = (EpochRecord) epochRecord3.join();
                Optional findAny = epochRecord5.getSegments().stream().filter(streamSegmentRecord -> {
                    return streamSegmentRecord.segmentId() == j;
                }).findAny();
                if (!$assertionsDisabled && !findAny.isPresent()) {
                    throw new AssertionError();
                }
                StreamSegmentRecord streamSegmentRecord2 = (StreamSegmentRecord) findAny.get();
                return (Map) ((List) epochRecord4.getSegments().stream().filter(streamSegmentRecord3 -> {
                    return streamSegmentRecord3.overlaps(streamSegmentRecord2);
                }).collect(Collectors.toList())).stream().collect(Collectors.toMap(streamSegmentRecord4 -> {
                    return streamSegmentRecord4;
                }, streamSegmentRecord5 -> {
                    return (List) epochRecord5.getSegments().stream().filter(streamSegmentRecord5 -> {
                        return streamSegmentRecord5.overlaps(streamSegmentRecord5);
                    }).map((v0) -> {
                        return v0.segmentId();
                    }).collect(Collectors.toList());
                }));
            });
        });
    }

    private CompletableFuture<EpochRecord> getActiveEpochRecord(boolean z) {
        return getCurrentEpochRecordData(z).thenApply((v0) -> {
            return v0.getObject();
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<List<StreamSegmentRecord>> getActiveSegments() {
        return verifyLegalState().thenCompose(r4 -> {
            return getActiveEpochRecord(true).thenApply(epochRecord -> {
                return epochRecord.getSegments();
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Map<StreamSegmentRecord, Long>> getSegmentsAtHead() {
        return getTruncationRecord().thenCompose(versionedMetadata -> {
            return ((StreamTruncationRecord) versionedMetadata.getObject()).equals(StreamTruncationRecord.EMPTY) ? getSegmentsInEpoch(0).thenApply(list -> {
                return (Map) list.stream().collect(Collectors.toMap(streamSegmentRecord -> {
                    return streamSegmentRecord;
                }, streamSegmentRecord2 -> {
                    return 0L;
                }));
            }) : CompletableFuture.completedFuture(((StreamTruncationRecord) versionedMetadata.getObject()).getStreamCut().entrySet().stream().collect(Collectors.toMap(entry -> {
                return (StreamSegmentRecord) ((StreamTruncationRecord) versionedMetadata.getObject()).getSpan().keySet().stream().filter(streamSegmentRecord -> {
                    return streamSegmentRecord.segmentId() == ((Long) entry.getKey()).longValue();
                }).findFirst().get();
            }, (v0) -> {
                return v0.getValue();
            })));
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<List<StreamSegmentRecord>> getSegmentsInEpoch(int i) {
        return getEpochRecord(i).thenApply(epochRecord -> {
            return epochRecord.getSegments();
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<List<StreamSegmentRecord>> getSegmentsBetweenStreamCuts(Map<Long, Long> map, Map<Long, Long> map2) {
        return segmentsBetweenStreamCuts(map, map2).thenApply((v1) -> {
            return new ArrayList(v1);
        });
    }

    private CompletableFuture<ImmutableSet<StreamSegmentRecord>> segmentsBetweenStreamCuts(Map<Long, Long> map, Map<Long, Long> map2) {
        CompletableFuture thenApply = map.isEmpty() ? getEpochRecord(0).thenApply(this::convertToSpan) : computeStreamCutSpan(map);
        CompletableFuture thenApply2 = map2.isEmpty() ? getActiveEpochRecord(true).thenApply(this::convertToSpan) : computeStreamCutSpan(map2);
        return CompletableFuture.allOf(thenApply, thenApply2).thenCompose(r10 -> {
            if (!map.isEmpty() && !map2.isEmpty()) {
                Preconditions.checkArgument(RecordHelper.streamCutComparator(map2, (Map) thenApply2.join(), map, (Map) thenApply.join()));
            }
            return segmentsBetweenStreamCutSpans((Map) thenApply.join(), (Map) thenApply2.join());
        });
    }

    @VisibleForTesting
    CompletableFuture<ImmutableSet<StreamSegmentRecord>> segmentsBetweenStreamCutSpans(Map<StreamSegmentRecord, Integer> map, Map<StreamSegmentRecord, Integer> map2) {
        int intValue = ((Integer) Collections.min(map2.values())).intValue();
        int intValue2 = ((Integer) Collections.max(map2.values())).intValue();
        int intValue3 = ((Integer) Collections.min(map.values())).intValue();
        int intValue4 = ((Integer) Collections.max(map.values())).intValue();
        HashSet hashSet = new HashSet();
        return fetchEpochs(intValue3, intValue2, true).thenAccept(list -> {
            list.forEach(epochRecord -> {
                if (epochRecord.getEpoch() < intValue4 || epochRecord.getEpoch() > intValue) {
                    epochRecord.getSegments().stream().filter(streamSegmentRecord -> {
                        return !hashSet.contains(streamSegmentRecord);
                    }).forEach(streamSegmentRecord2 -> {
                        boolean allMatch = map.keySet().stream().filter(streamSegmentRecord2 -> {
                            return streamSegmentRecord2.overlaps(streamSegmentRecord2);
                        }).allMatch(streamSegmentRecord3 -> {
                            return streamSegmentRecord3.segmentId() <= streamSegmentRecord2.segmentId();
                        });
                        boolean allMatch2 = map2.keySet().stream().filter(streamSegmentRecord4 -> {
                            return streamSegmentRecord4.overlaps(streamSegmentRecord2);
                        }).allMatch(streamSegmentRecord5 -> {
                            return streamSegmentRecord2.segmentId() <= streamSegmentRecord5.segmentId();
                        });
                        if (allMatch && allMatch2) {
                            hashSet.add(streamSegmentRecord2);
                        }
                    });
                } else {
                    hashSet.addAll(epochRecord.getSegments());
                }
            });
        }).thenApply(r3 -> {
            return ImmutableSet.copyOf(hashSet);
        });
    }

    @VisibleForTesting
    CompletableFuture<Long> sizeBetweenStreamCuts(Map<Long, Long> map, Map<Long, Long> map2, Set<StreamSegmentRecord> set) {
        return Futures.allOfWithResults((List) ((Map) set.stream().collect(Collectors.groupingBy(streamSegmentRecord -> {
            return Integer.valueOf(getShardNumber(streamSegmentRecord.segmentId()));
        }))).entrySet().stream().map(entry -> {
            return getSealedSegmentSizeMapShard(((Integer) entry.getKey()).intValue()).thenApply(sealedSegmentsMapShard -> {
                return (Map) ((List) entry.getValue()).stream().collect(Collectors.toMap(streamSegmentRecord2 -> {
                    return streamSegmentRecord2;
                }, streamSegmentRecord3 -> {
                    if (sealedSegmentsMapShard.getSize(streamSegmentRecord3.segmentId()) == null) {
                        return Long.MIN_VALUE;
                    }
                    return sealedSegmentsMapShard.getSize(streamSegmentRecord3.segmentId());
                }));
            });
        }).collect(Collectors.toList())).thenApply(list -> {
            return (Map) list.stream().flatMap(map3 -> {
                return map3.entrySet().stream();
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }));
        }).thenApply(map3 -> {
            AtomicLong atomicLong = new AtomicLong(0L);
            map3.forEach((streamSegmentRecord2, l) -> {
                if (map2.containsKey(Long.valueOf(streamSegmentRecord2.segmentId())) && map.containsKey(Long.valueOf(streamSegmentRecord2.segmentId()))) {
                    atomicLong.addAndGet(((Long) map2.get(Long.valueOf(streamSegmentRecord2.segmentId()))).longValue() - ((Long) map.get(Long.valueOf(streamSegmentRecord2.segmentId()))).longValue());
                    return;
                }
                if (map2.containsKey(Long.valueOf(streamSegmentRecord2.segmentId()))) {
                    atomicLong.addAndGet(((Long) map2.get(Long.valueOf(streamSegmentRecord2.segmentId()))).longValue());
                    return;
                }
                if (map.containsKey(Long.valueOf(streamSegmentRecord2.segmentId()))) {
                    if (!$assertionsDisabled && l.longValue() < 0) {
                        throw new AssertionError();
                    }
                    atomicLong.addAndGet(l.longValue() - ((Long) map.get(Long.valueOf(streamSegmentRecord2.segmentId()))).longValue());
                    return;
                }
                if (!$assertionsDisabled && l.longValue() < 0) {
                    throw new AssertionError();
                }
                atomicLong.addAndGet(l.longValue());
            });
            return Long.valueOf(atomicLong.get());
        });
    }

    @VisibleForTesting
    CompletableFuture<ImmutableMap<StreamSegmentRecord, Integer>> computeStreamCutSpan(Map<Long, Long> map) {
        long longValue = map.keySet().stream().max(Comparator.naturalOrder()).get().longValue();
        int epoch = NameUtils.getEpoch(map.keySet().stream().min(Comparator.naturalOrder()).get().longValue());
        int epoch2 = NameUtils.getEpoch(longValue);
        return fetchEpochs(epoch, epoch2, true).thenApply(list -> {
            ArrayList arrayList = new ArrayList(map.keySet());
            ImmutableMap.Builder builder = ImmutableMap.builder();
            for (int i = epoch2 - epoch; i >= 0 && !arrayList.isEmpty(); i--) {
                EpochRecord epochRecord = (EpochRecord) list.get(i);
                Set<Long> segmentIds = epochRecord.getSegmentIds();
                java.util.stream.Stream stream = arrayList.stream();
                segmentIds.getClass();
                builder.putAll((Map) ((List) stream.filter((v1) -> {
                    return r1.contains(v1);
                }).collect(Collectors.toList())).stream().collect(Collectors.toMap(l -> {
                    return (StreamSegmentRecord) epochRecord.getSegments().stream().filter(streamSegmentRecord -> {
                        return streamSegmentRecord.segmentId() == l.longValue();
                    }).findFirst().get();
                }, l2 -> {
                    return Integer.valueOf(epochRecord.getEpoch());
                })));
                arrayList.removeAll(segmentIds);
            }
            return builder.build();
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Boolean> isStreamCutValid(Map<Long, Long> map) {
        return Futures.allOfWithResults((List) ((Map) map.keySet().stream().collect(Collectors.groupingBy((v0) -> {
            return NameUtils.getEpoch(v0);
        }))).entrySet().stream().map(entry -> {
            return getEpochRecord(((Integer) entry.getKey()).intValue()).thenApply(epochRecord -> {
                return (List) ((List) entry.getValue()).stream().map(l -> {
                    StreamSegmentRecord segment = epochRecord.getSegment(l.longValue());
                    return new AbstractMap.SimpleEntry(Double.valueOf(segment.getKeyStart()), Double.valueOf(segment.getKeyEnd()));
                }).collect(Collectors.toList());
            });
        }).collect(Collectors.toList())).thenApply(list -> {
            return (List) list.stream().flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList());
        }).thenAccept(list2 -> {
            RecordHelper.validateStreamCut(new ArrayList(list2));
        }).handle((r6, th) -> {
            if (th == null) {
                return true;
            }
            if (Exceptions.unwrap(th) instanceof IllegalArgumentException) {
                return false;
            }
            log.warn("Exception while trying to validate a stream cut for stream {}/{}", this.scope, this.name);
            throw Exceptions.sneakyThrow(th);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> submitScale(List<Long> list, List<Map.Entry<Double, Double>> list2, long j, VersionedMetadata<EpochTransitionRecord> versionedMetadata) {
        return verifyNotSealed().thenCompose(r4 -> {
            return versionedMetadata == null ? getEpochTransition() : CompletableFuture.completedFuture(versionedMetadata);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) versionedMetadata2 -> {
            return getActiveEpochRecord(true).thenCompose(epochRecord -> {
                return getConfiguration().thenCompose(streamConfiguration -> {
                    if (!((EpochTransitionRecord) versionedMetadata2.getObject()).equals(EpochTransitionRecord.EMPTY)) {
                        if (RecordHelper.verifyRecordMatchesInput(list, list2, false, (EpochTransitionRecord) versionedMetadata2.getObject())) {
                            return CompletableFuture.completedFuture(versionedMetadata2);
                        }
                        log.debug("scale conflict, another scale operation is ongoing");
                        throw new EpochTransitionOperationExceptions.ConflictException();
                    }
                    if (!RecordHelper.canScaleFor(list, epochRecord)) {
                        return updateEpochTransitionNode(new VersionedMetadata<>(EpochTransitionRecord.EMPTY, versionedMetadata2.getVersion())).thenApply(version -> {
                            log.warn("scale precondition failed {}", list);
                            throw new EpochTransitionOperationExceptions.PreConditionFailureException();
                        });
                    }
                    if (!RecordHelper.validateInputRange(list, list2, epochRecord)) {
                        log.error("scale input invalid {} {}", list, list2);
                        throw new EpochTransitionOperationExceptions.InputInvalidException();
                    }
                    if ((epochRecord.getSegments().size() - list.size()) + list2.size() < streamConfiguration.getScalingPolicy().getMinNumSegments()) {
                        log.warn("Scale cannot be performed as Min Segment Count will not hold {} {}", list, list2);
                        throw new EpochTransitionOperationExceptions.PreConditionFailureException();
                    }
                    EpochTransitionRecord computeEpochTransition = RecordHelper.computeEpochTransition(epochRecord, list, list2, j);
                    return updateEpochTransitionNode(new VersionedMetadata<>(computeEpochTransition, versionedMetadata2.getVersion())).thenApply(version2 -> {
                        log.info("scale for stream {}/{} accepted. Segments to seal = {}", new Object[]{this.scope, this.name, computeEpochTransition.getSegmentsToSeal()});
                        return new VersionedMetadata(computeEpochTransition, version2);
                    });
                });
            });
        });
    }

    private CompletableFuture<Void> verifyNotSealed() {
        return getState(false).thenAccept(state -> {
            if (state.equals(State.SEALING) || state.equals(State.SEALED)) {
                throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + getName() + " State: " + state.name());
            }
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> startScale(boolean z, VersionedMetadata<EpochTransitionRecord> versionedMetadata, VersionedMetadata<State> versionedMetadata2) {
        Preconditions.checkArgument(versionedMetadata2.getObject().equals(State.SCALING));
        return getCurrentEpochRecordData(true).thenCompose(versionedMetadata3 -> {
            EpochRecord epochRecord = (EpochRecord) versionedMetadata3.getObject();
            return z ? migrateManualScaleToNewEpoch(versionedMetadata, versionedMetadata2, epochRecord) : discardInconsistentEpochTransition(versionedMetadata, versionedMetadata2, epochRecord);
        });
    }

    private CompletableFuture<VersionedMetadata<EpochTransitionRecord>> discardInconsistentEpochTransition(VersionedMetadata<EpochTransitionRecord> versionedMetadata, VersionedMetadata<State> versionedMetadata2, EpochRecord epochRecord) {
        return versionedMetadata.getObject().getNewEpoch() > epochRecord.getEpoch() ? CompletableFuture.completedFuture(versionedMetadata) : updateEpochTransitionNode(new VersionedMetadata<>(EpochTransitionRecord.EMPTY, versionedMetadata.getVersion())).thenCompose(version -> {
            return updateVersionedState(versionedMetadata2, State.ACTIVE);
        }).thenApply((Function<? super U, ? extends U>) versionedMetadata3 -> {
            log.warn("Scale epoch transition record is inconsistent with VersionedMetadata in the table. {}", Integer.valueOf(((EpochTransitionRecord) versionedMetadata.getObject()).getNewEpoch()));
            throw new IllegalStateException("Epoch transition record is inconsistent.");
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> scaleCreateNewEpoch(VersionedMetadata<EpochTransitionRecord> versionedMetadata) {
        return getActiveEpochRecord(true).thenCompose(epochRecord -> {
            if (epochRecord.getEpoch() >= ((EpochTransitionRecord) versionedMetadata.getObject()).getNewEpoch()) {
                return CompletableFuture.completedFuture(versionedMetadata);
            }
            EpochTransitionRecord epochTransitionRecord = (EpochTransitionRecord) versionedMetadata.getObject();
            long max = Math.max(epochTransitionRecord.getTime(), epochRecord.getCreationTime() + 1);
            ImmutableList.Builder builder = ImmutableList.builder();
            epochTransitionRecord.getNewSegmentsWithRange().forEach((l, entry) -> {
                builder.add(newSegmentRecord(l.longValue(), max, (Double) entry.getKey(), (Double) entry.getValue()));
            });
            ImmutableList.Builder builder2 = ImmutableList.builder();
            epochTransitionRecord.getSegmentsToSeal().forEach(l2 -> {
                builder2.add(epochRecord.getSegment(l2.longValue()));
            });
            ImmutableList.Builder builder3 = ImmutableList.builder();
            epochRecord.getSegments().forEach(streamSegmentRecord -> {
                if (epochTransitionRecord.getSegmentsToSeal().contains(Long.valueOf(streamSegmentRecord.segmentId()))) {
                    return;
                }
                builder3.add(streamSegmentRecord);
            });
            ImmutableList build = builder.build();
            builder3.addAll(build);
            EpochRecord epochRecord = new EpochRecord(epochTransitionRecord.getNewEpoch(), epochTransitionRecord.getNewEpoch(), builder3.build(), max);
            HistoryTimeSeriesRecord historyTimeSeriesRecord = new HistoryTimeSeriesRecord(epochTransitionRecord.getNewEpoch(), epochTransitionRecord.getNewEpoch(), builder2.build(), build, epochRecord.getCreationTime());
            return createEpochRecord(epochRecord).thenCompose(r5 -> {
                return updateHistoryTimeSeries(historyTimeSeriesRecord);
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r6 -> {
                return createSegmentSealedEpochRecords(epochTransitionRecord.getSegmentsToSeal(), epochTransitionRecord.getNewEpoch());
            }).thenApply(r3 -> {
                return versionedMetadata;
            });
        });
    }

    private CompletableFuture<Void> updateHistoryTimeSeries(HistoryTimeSeriesRecord historyTimeSeriesRecord) {
        int epoch = historyTimeSeriesRecord.getEpoch() / this.historyChunkSize.get();
        return historyTimeSeriesRecord.getEpoch() % this.historyChunkSize.get() == 0 ? createHistoryTimeSeriesChunk(epoch, historyTimeSeriesRecord) : getHistoryTimeSeriesChunkData(epoch, true).thenCompose(versionedMetadata -> {
            HistoryTimeSeries historyTimeSeries = (HistoryTimeSeries) versionedMetadata.getObject();
            return historyTimeSeries.getLatestRecord().getEpoch() < historyTimeSeriesRecord.getEpoch() ? Futures.toVoid(updateHistoryTimeSeriesChunkData(epoch, new VersionedMetadata<>(HistoryTimeSeries.addHistoryRecord(historyTimeSeries, historyTimeSeriesRecord), versionedMetadata.getVersion()))) : CompletableFuture.completedFuture(null);
        });
    }

    private CompletableFuture<VersionedMetadata<EpochTransitionRecord>> migrateManualScaleToNewEpoch(VersionedMetadata<EpochTransitionRecord> versionedMetadata, VersionedMetadata<State> versionedMetadata2, EpochRecord epochRecord) {
        EpochTransitionRecord object = versionedMetadata.getObject();
        return getEpochRecord(object.getActiveEpoch()).thenCompose(epochRecord2 -> {
            if (object.getActiveEpoch() == epochRecord.getEpoch()) {
                return CompletableFuture.completedFuture(versionedMetadata);
            }
            if (epochRecord.getEpoch() <= object.getActiveEpoch() || epochRecord.getReferenceEpoch() != epochRecord2.getReferenceEpoch()) {
                return updateEpochTransitionNode(new VersionedMetadata<>(EpochTransitionRecord.EMPTY, versionedMetadata.getVersion())).thenCompose(version -> {
                    return updateVersionedState(versionedMetadata2, State.ACTIVE);
                }).thenApply((Function<? super U, ? extends U>) versionedMetadata3 -> {
                    log.warn("Scale epoch transition record is inconsistent with VersionedMetadata in the table. {}", Integer.valueOf(object.getNewEpoch()));
                    throw new IllegalStateException("Epoch transition record is inconsistent.");
                });
            }
            EpochTransitionRecord computeEpochTransition = RecordHelper.computeEpochTransition(epochRecord, (List) object.getSegmentsToSeal().stream().map(l -> {
                return Long.valueOf(NameUtils.computeSegmentId(NameUtils.getSegmentNumber(l.longValue()), epochRecord.getEpoch()));
            }).collect(Collectors.toList()), Lists.newArrayList(object.getNewSegmentsWithRange().values()), object.getTime());
            return updateEpochTransitionNode(new VersionedMetadata<>(computeEpochTransition, versionedMetadata.getVersion())).thenApply(version2 -> {
                return new VersionedMetadata(computeEpochTransition, version2);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> getEpochTransition() {
        return getEpochTransitionNode().thenApply(versionedMetadata -> {
            return new VersionedMetadata(versionedMetadata.getObject(), versionedMetadata.getVersion());
        });
    }

    private CompletableFuture<Void> clearMarkers(Set<Long> set) {
        return Futures.toVoid(Futures.allOfWithResults((List) ((java.util.stream.Stream) set.stream().parallel()).map((v1) -> {
            return removeColdMarker(v1);
        }).collect(Collectors.toList())));
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> scaleOldSegmentsSealed(Map<Long, Long> map, VersionedMetadata<EpochTransitionRecord> versionedMetadata) {
        EpochTransitionRecord object = versionedMetadata.getObject();
        return Futures.toVoid(clearMarkers(object.getSegmentsToSeal()).thenCompose(r5 -> {
            return updateSealedSegmentSizes(map);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r52 -> {
            return updateCurrentEpochRecord(object.getNewEpoch());
        }));
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> completeScale(VersionedMetadata<EpochTransitionRecord> versionedMetadata) {
        Preconditions.checkNotNull(versionedMetadata);
        Preconditions.checkArgument(!versionedMetadata.getObject().equals(EpochTransitionRecord.EMPTY));
        return Futures.toVoid(updateEpochTransitionNode(new VersionedMetadata<>(EpochTransitionRecord.EMPTY, versionedMetadata.getVersion())));
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> startRollingTxn(int i, VersionedMetadata<CommittingTransactionsRecord> versionedMetadata) {
        CommittingTransactionsRecord object = versionedMetadata.getObject();
        if (object.isRollingTxnRecord()) {
            return CompletableFuture.completedFuture(versionedMetadata);
        }
        CommittingTransactionsRecord createRollingTxnRecord = object.createRollingTxnRecord(i);
        return updateCommittingTxnRecord(new VersionedMetadata<>(createRollingTxnRecord, versionedMetadata.getVersion())).thenApply(version -> {
            return new VersionedMetadata(createRollingTxnRecord, version);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> rollingTxnCreateDuplicateEpochs(Map<Long, Long> map, long j, VersionedMetadata<CommittingTransactionsRecord> versionedMetadata) {
        Preconditions.checkArgument(versionedMetadata.getObject().isRollingTxnRecord());
        CommittingTransactionsRecord object = versionedMetadata.getObject();
        return getActiveEpoch(true).thenCompose(epochRecord -> {
            return getEpochRecord(object.getEpoch()).thenCompose(epochRecord -> {
                if (epochRecord.getEpoch() > object.getCurrentEpoch()) {
                    log.debug("Duplicate Epochs {} already created. Ignore.", Integer.valueOf(object.getNewActiveEpoch()));
                    return CompletableFuture.completedFuture(null);
                }
                long max = Math.max(epochRecord.getCreationTime() + 1, j);
                ImmutableList.Builder builder = ImmutableList.builder();
                epochRecord.getSegments().stream().forEach(streamSegmentRecord -> {
                    builder.add(newSegmentRecord(NameUtils.computeSegmentId(NameUtils.getSegmentNumber(streamSegmentRecord.segmentId()), object.getNewTxnEpoch()), max, Double.valueOf(streamSegmentRecord.getKeyStart()), Double.valueOf(streamSegmentRecord.getKeyEnd())));
                });
                ImmutableList.Builder builder2 = ImmutableList.builder();
                epochRecord.getSegments().stream().forEach(streamSegmentRecord2 -> {
                    builder2.add(newSegmentRecord(NameUtils.computeSegmentId(NameUtils.getSegmentNumber(streamSegmentRecord2.segmentId()), object.getNewActiveEpoch()), max + 1, Double.valueOf(streamSegmentRecord2.getKeyStart()), Double.valueOf(streamSegmentRecord2.getKeyEnd())));
                });
                EpochRecord epochRecord = new EpochRecord(object.getNewTxnEpoch(), epochRecord.getReferenceEpoch(), builder.build(), max);
                EpochRecord epochRecord2 = new EpochRecord(object.getNewActiveEpoch(), epochRecord.getReferenceEpoch(), builder2.build(), max + 1);
                HistoryTimeSeriesRecord historyTimeSeriesRecord = new HistoryTimeSeriesRecord(epochRecord.getEpoch(), epochRecord.getReferenceEpoch(), ImmutableList.of(), ImmutableList.of(), max);
                HistoryTimeSeriesRecord historyTimeSeriesRecord2 = new HistoryTimeSeriesRecord(epochRecord2.getEpoch(), epochRecord2.getReferenceEpoch(), ImmutableList.of(), ImmutableList.of(), max + 1);
                return createEpochRecord(epochRecord).thenCompose(r5 -> {
                    return updateHistoryTimeSeries(historyTimeSeriesRecord);
                }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r52 -> {
                    return createEpochRecord(epochRecord2);
                }).thenCompose(r53 -> {
                    return updateHistoryTimeSeries(historyTimeSeriesRecord2);
                }).thenCompose(r7 -> {
                    return createSegmentSealedEpochRecords((Collection) epochRecord.getSegments().stream().map((v0) -> {
                        return v0.segmentId();
                    }).collect(Collectors.toList()), epochRecord.getEpoch());
                }).thenCompose(r72 -> {
                    return createSegmentSealedEpochRecords((Collection) epochRecord.getSegments().stream().map((v0) -> {
                        return v0.segmentId();
                    }).collect(Collectors.toList()), epochRecord2.getEpoch());
                });
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r5 -> {
                return updateSealedSegmentSizes(map);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> completeRollingTxn(Map<Long, Long> map, VersionedMetadata<CommittingTransactionsRecord> versionedMetadata) {
        return getActiveEpoch(true).thenCompose(epochRecord -> {
            CommittingTransactionsRecord committingTransactionsRecord = (CommittingTransactionsRecord) versionedMetadata.getObject();
            return epochRecord.getEpoch() == committingTransactionsRecord.getCurrentEpoch() ? updateSealedSegmentSizes(map).thenCompose(r5 -> {
                return clearMarkers(map.keySet());
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r52 -> {
                return updateCurrentEpochRecord(committingTransactionsRecord.getNewActiveEpoch());
            }) : CompletableFuture.completedFuture(null);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<UUID> generateNewTxnId(int i, long j) {
        return getActiveEpochRecord(true).thenApply(epochRecord -> {
            return RecordHelper.generateTxnId(epochRecord.getReferenceEpoch(), i, j);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedTransactionData> createTransaction(UUID uuid, long j, long j2) {
        long currentTimeMillis = System.currentTimeMillis();
        long j3 = currentTimeMillis + j;
        long j4 = currentTimeMillis + j2;
        int transactionEpoch = RecordHelper.getTransactionEpoch(uuid);
        ActiveTxnRecord m154build = ActiveTxnRecord.builder().txnStatus(TxnStatus.OPEN).leaseExpiryTime(j3).txCreationTimestamp(currentTimeMillis).maxExecutionExpiryTime(j4).writerId(Optional.empty()).commitTime(Optional.empty()).commitOrder(Optional.empty()).m154build();
        return verifyNotSealed().thenCompose(r16 -> {
            return createNewTransaction(transactionEpoch, uuid, m154build).thenApply(version -> {
                return new VersionedTransactionData(transactionEpoch, uuid, version, TxnStatus.OPEN, currentTimeMillis, j4, "", Long.MIN_VALUE, Long.MIN_VALUE, ImmutableMap.of());
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedTransactionData> pingTransaction(VersionedTransactionData versionedTransactionData, long j) {
        int epoch = versionedTransactionData.getEpoch();
        UUID id = versionedTransactionData.getId();
        Version version = versionedTransactionData.getVersion();
        long creationTime = versionedTransactionData.getCreationTime();
        long maxExecutionExpiryTime = versionedTransactionData.getMaxExecutionExpiryTime();
        TxnStatus status = versionedTransactionData.getStatus();
        String writerId = versionedTransactionData.getWriterId();
        long longValue = versionedTransactionData.getCommitTime().longValue();
        long longValue2 = versionedTransactionData.getPosition().longValue();
        ImmutableMap<Long, Long> commitOffsets = versionedTransactionData.getCommitOffsets();
        return updateActiveTx(epoch, id, new VersionedMetadata<>(new ActiveTxnRecord(creationTime, System.currentTimeMillis() + j, maxExecutionExpiryTime, status, writerId, longValue, longValue2, commitOffsets), version)).thenApply(version2 -> {
            return new VersionedTransactionData(epoch, id, version2, status, creationTime, maxExecutionExpiryTime, writerId, Long.valueOf(longValue), Long.valueOf(longValue2), commitOffsets);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedTransactionData> getTransactionData(UUID uuid) {
        int transactionEpoch = RecordHelper.getTransactionEpoch(uuid);
        return getActiveTx(transactionEpoch, uuid).thenApply(versionedMetadata -> {
            ActiveTxnRecord activeTxnRecord = (ActiveTxnRecord) versionedMetadata.getObject();
            return new VersionedTransactionData(transactionEpoch, uuid, versionedMetadata.getVersion(), activeTxnRecord.getTxnStatus(), activeTxnRecord.getTxCreationTimestamp(), activeTxnRecord.getMaxExecutionExpiryTime(), activeTxnRecord.getWriterId(), Long.valueOf(activeTxnRecord.getCommitTime()), Long.valueOf(activeTxnRecord.getCommitOrder()), activeTxnRecord.getCommitOffsets());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<TxnStatus> checkTransactionStatus(UUID uuid) {
        return getActiveTx(RecordHelper.getTransactionEpoch(uuid), uuid).handle((versionedMetadata, th) -> {
            if (th != null && (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException)) {
                return TxnStatus.UNKNOWN;
            }
            if (th != null) {
                throw new CompletionException(th);
            }
            return ((ActiveTxnRecord) versionedMetadata.getObject()).getTxnStatus();
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) txnStatus -> {
            return txnStatus.equals(TxnStatus.UNKNOWN) ? getCompletedTxnStatus(uuid) : CompletableFuture.completedFuture(txnStatus);
        });
    }

    private CompletableFuture<TxnStatus> getCompletedTxnStatus(UUID uuid) {
        return getCompletedTx(uuid).handle((versionedMetadata, th) -> {
            if (th != null && (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException)) {
                return TxnStatus.UNKNOWN;
            }
            if (th != null) {
                throw new CompletionException(th);
            }
            return ((CompletedTxnRecord) versionedMetadata.getObject()).getCompletionStatus();
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<AbstractMap.SimpleEntry<TxnStatus, Integer>> sealTransaction(UUID uuid, boolean z, Optional<Version> optional, String str, long j) {
        return sealActiveTxn(RecordHelper.getTransactionEpoch(uuid), uuid, z, optional, str, j).exceptionally(th -> {
            return new AbstractMap.SimpleEntry(handleDataNotFoundException(th), null);
        }).thenCompose(simpleEntry -> {
            return simpleEntry.getKey() == TxnStatus.UNKNOWN ? validateCompletedTxn(uuid, z).thenApply(txnStatus -> {
                return new AbstractMap.SimpleEntry(txnStatus, null);
            }) : CompletableFuture.completedFuture(simpleEntry);
        });
    }

    private CompletableFuture<AbstractMap.SimpleEntry<TxnStatus, Integer>> sealActiveTxn(int i, UUID uuid, boolean z, Optional<Version> optional, String str, long j) {
        return getActiveTx(i, uuid).thenCompose(versionedMetadata -> {
            ActiveTxnRecord activeTxnRecord = (ActiveTxnRecord) versionedMetadata.getObject();
            versionedMetadata.getClass();
            Version version = (Version) optional.orElseGet(versionedMetadata::getVersion);
            TxnStatus txnStatus = activeTxnRecord.getTxnStatus();
            switch (AnonymousClass1.$SwitchMap$io$pravega$controller$store$stream$TxnStatus[txnStatus.ordinal()]) {
                case ApiResponseMessage.ERROR /* 1 */:
                case ApiResponseMessage.INFO /* 3 */:
                    if (z) {
                        return CompletableFuture.completedFuture(new AbstractMap.SimpleEntry(txnStatus, Integer.valueOf(i)));
                    }
                    throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + getName() + " Transaction: " + uuid.toString() + " State: " + txnStatus.name());
                case ApiResponseMessage.WARNING /* 2 */:
                    return sealActiveTx(i, uuid, z, activeTxnRecord, version, str, j).thenApply(version2 -> {
                        return new AbstractMap.SimpleEntry(z ? TxnStatus.COMMITTING : TxnStatus.ABORTING, Integer.valueOf(i));
                    });
                case ApiResponseMessage.OK /* 4 */:
                case ApiResponseMessage.TOO_BUSY /* 5 */:
                    if (z) {
                        throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + getName() + " Transaction: " + uuid.toString() + " State: " + txnStatus.name());
                    }
                    return CompletableFuture.completedFuture(new AbstractMap.SimpleEntry(txnStatus, Integer.valueOf(i)));
                default:
                    throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Stream: " + getName() + " Transaction: " + uuid.toString());
            }
        });
    }

    private CompletableFuture<Version> sealActiveTx(int i, UUID uuid, boolean z, ActiveTxnRecord activeTxnRecord, Version version, String str, long j) {
        return (z ? !activeTxnRecord.getTxnStatus().equals(TxnStatus.COMMITTING) ? addTxnToCommitOrder(uuid).thenApply(l -> {
            return new ActiveTxnRecord(activeTxnRecord.getTxCreationTimestamp(), activeTxnRecord.getLeaseExpiryTime(), activeTxnRecord.getMaxExecutionExpiryTime(), TxnStatus.COMMITTING, str, j, l.longValue());
        }) : CompletableFuture.completedFuture(activeTxnRecord) : CompletableFuture.completedFuture(new ActiveTxnRecord(activeTxnRecord.getTxCreationTimestamp(), activeTxnRecord.getLeaseExpiryTime(), activeTxnRecord.getMaxExecutionExpiryTime(), TxnStatus.ABORTING))).thenCompose(activeTxnRecord2 -> {
            return updateActiveTx(i, uuid, new VersionedMetadata<>(activeTxnRecord2, version));
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> recordCommitOffsets(UUID uuid, Map<Long, Long> map) {
        int transactionEpoch = RecordHelper.getTransactionEpoch(uuid);
        return Futures.exceptionallyExpecting(getActiveTx(transactionEpoch, uuid).thenCompose(versionedMetadata -> {
            ActiveTxnRecord activeTxnRecord = (ActiveTxnRecord) versionedMetadata.getObject();
            Preconditions.checkArgument(activeTxnRecord.getTxnStatus().equals(TxnStatus.COMMITTING));
            return activeTxnRecord.getCommitOffsets().isEmpty() ? Futures.toVoid(updateActiveTx(transactionEpoch, uuid, new VersionedMetadata<>(new ActiveTxnRecord(activeTxnRecord.getTxCreationTimestamp(), activeTxnRecord.getLeaseExpiryTime(), activeTxnRecord.getMaxExecutionExpiryTime(), TxnStatus.COMMITTING, activeTxnRecord.getWriterId(), activeTxnRecord.getCommitTime(), activeTxnRecord.getCommitOrder(), (ImmutableMap<Long, Long>) ImmutableMap.copyOf(map)), versionedMetadata.getVersion()))) : CompletableFuture.completedFuture(null);
        }), AbstractStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, (Object) null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> generateMarksForTransactions(CommittingTransactionsRecord committingTransactionsRecord) {
        return Futures.allOfWithResults((List) committingTransactionsRecord.getTransactionsToCommit().stream().map(uuid -> {
            return Futures.exceptionallyExpecting(getActiveTx(RecordHelper.getTransactionEpoch(uuid), uuid), AbstractStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, (Object) null);
        }).collect(Collectors.toList())).thenCompose(list -> {
            return Futures.allOf((List) ((Map) list.stream().filter(versionedMetadata -> {
                return (versionedMetadata == null || Strings.isNullOrEmpty(((ActiveTxnRecord) versionedMetadata.getObject()).getWriterId()) || ((ActiveTxnRecord) versionedMetadata.getObject()).getCommitTime() < 0 || ((ActiveTxnRecord) versionedMetadata.getObject()).getCommitOffsets().isEmpty()) ? false : true;
            }).collect(Collectors.groupingBy(versionedMetadata2 -> {
                return ((ActiveTxnRecord) versionedMetadata2.getObject()).getWriterId();
            }))).entrySet().stream().map(entry -> {
                ActiveTxnRecord activeTxnRecord = (ActiveTxnRecord) ((VersionedMetadata) ((List) entry.getValue()).stream().max(Comparator.comparingLong(versionedMetadata3 -> {
                    return ((ActiveTxnRecord) versionedMetadata3.getObject()).getCommitTime();
                })).get()).getObject();
                return Futures.exceptionallyExpecting(noteWriterMark(activeTxnRecord.getWriterId(), activeTxnRecord.getCommitTime(), activeTxnRecord.getCommitOffsets()), AbstractStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, (Object) null);
            }).collect(Collectors.toList()));
        });
    }

    @VisibleForTesting
    public CompletableFuture<TxnStatus> commitTransaction(UUID uuid) {
        int transactionEpoch = RecordHelper.getTransactionEpoch(uuid);
        return checkTransactionStatus(uuid).thenApply(txnStatus -> {
            switch (AnonymousClass1.$SwitchMap$io$pravega$controller$store$stream$TxnStatus[txnStatus.ordinal()]) {
                case ApiResponseMessage.ERROR /* 1 */:
                case ApiResponseMessage.INFO /* 3 */:
                    return txnStatus;
                case ApiResponseMessage.WARNING /* 2 */:
                case ApiResponseMessage.OK /* 4 */:
                case ApiResponseMessage.TOO_BUSY /* 5 */:
                    throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + getName() + " Transaction: " + uuid.toString() + " State: " + txnStatus.toString());
                case 6:
                default:
                    throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Stream: " + getName() + " Transaction: " + uuid.toString());
            }
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) txnStatus2 -> {
            return txnStatus2.equals(TxnStatus.COMMITTING) ? createCompletedTxEntry(uuid, new CompletedTxnRecord(System.currentTimeMillis(), TxnStatus.COMMITTED)) : CompletableFuture.completedFuture(null);
        }).thenCompose(r7 -> {
            return removeActiveTxEntry(transactionEpoch, uuid);
        }).thenApply(r2 -> {
            return TxnStatus.COMMITTED;
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<TxnStatus> abortTransaction(UUID uuid) {
        int transactionEpoch = RecordHelper.getTransactionEpoch(uuid);
        return checkTransactionStatus(uuid).thenApply(txnStatus -> {
            switch (AnonymousClass1.$SwitchMap$io$pravega$controller$store$stream$TxnStatus[txnStatus.ordinal()]) {
                case ApiResponseMessage.ERROR /* 1 */:
                case ApiResponseMessage.WARNING /* 2 */:
                case ApiResponseMessage.INFO /* 3 */:
                    throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + getName() + " Transaction: " + uuid.toString() + " State: " + txnStatus.name());
                case ApiResponseMessage.OK /* 4 */:
                case ApiResponseMessage.TOO_BUSY /* 5 */:
                    return txnStatus;
                case 6:
                default:
                    throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Stream: " + getName() + " Transaction: " + uuid.toString());
            }
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) txnStatus2 -> {
            return txnStatus2.equals(TxnStatus.ABORTING) ? createCompletedTxEntry(uuid, new CompletedTxnRecord(System.currentTimeMillis(), TxnStatus.ABORTED)) : CompletableFuture.completedFuture(null);
        }).thenCompose(r7 -> {
            return removeActiveTxEntry(transactionEpoch, uuid);
        }).thenApply(r2 -> {
            return TxnStatus.ABORTED;
        });
    }

    private TxnStatus handleDataNotFoundException(Throwable th) {
        if (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException) {
            return TxnStatus.UNKNOWN;
        }
        throw th;
    }

    private CompletableFuture<TxnStatus> validateCompletedTxn(UUID uuid, boolean z) {
        return getCompletedTxnStatus(uuid).thenApply(txnStatus -> {
            if ((z && txnStatus == TxnStatus.COMMITTED) || (!z && txnStatus == TxnStatus.ABORTED)) {
                return txnStatus;
            }
            if (txnStatus == TxnStatus.UNKNOWN) {
                throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Stream: " + getName() + " Transaction: " + uuid.toString());
            }
            throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + getName() + " Transaction: " + uuid.toString() + " State: " + txnStatus.name());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<EpochRecord> getActiveEpoch(boolean z) {
        return getCurrentEpochRecordData(z).thenApply((v0) -> {
            return v0.getObject();
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<EpochRecord> getEpochRecord(int i) {
        return getEpochRecordData(i).thenApply((v0) -> {
            return v0.getObject();
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> setColdMarker(long j, long j2) {
        return getMarkerData(j).thenCompose(versionedMetadata -> {
            return versionedMetadata != null ? Futures.toVoid(updateMarkerData(j, new VersionedMetadata<>(Long.valueOf(j2), versionedMetadata.getVersion()))) : createMarkerData(j, j2);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Long> getColdMarker(long j) {
        return getMarkerData(j).thenApply(versionedMetadata -> {
            if (versionedMetadata != null) {
                return (Long) versionedMetadata.getObject();
            }
            return 0L;
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> removeColdMarker(long j) {
        return removeMarkerData(j);
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Long> getSizeTillStreamCut(Map<Long, Long> map, Optional<StreamCutRecord> optional) {
        Map<Long, Long> map2 = (Map) optional.map((v0) -> {
            return v0.getStreamCut();
        }).orElse(Collections.emptyMap());
        return segmentsBetweenStreamCuts(map2, map).thenCompose(immutableSet -> {
            return sizeBetweenStreamCuts(map2, map, immutableSet).thenApply(l -> {
                return Long.valueOf(l.longValue() + ((Long) optional.map((v0) -> {
                    return v0.getRecordingSize();
                }).orElse(0L)).longValue());
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> addStreamCutToRetentionSet(StreamCutRecord streamCutRecord) {
        return getRetentionSetData().thenCompose(versionedMetadata -> {
            RetentionSet addReferenceToStreamCutIfLatest = RetentionSet.addReferenceToStreamCutIfLatest((RetentionSet) versionedMetadata.getObject(), streamCutRecord);
            return createStreamCutRecordData(streamCutRecord.getRecordingTime(), streamCutRecord).thenCompose(r9 -> {
                return Futures.toVoid(updateRetentionSetData(new VersionedMetadata<>(addReferenceToStreamCutIfLatest, versionedMetadata.getVersion())));
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<RetentionSet> getRetentionSet() {
        return getRetentionSetData().thenApply((v0) -> {
            return v0.getObject();
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<StreamCutRecord> getStreamCutRecord(StreamCutReferenceRecord streamCutReferenceRecord) {
        return getStreamCutRecordData(streamCutReferenceRecord.getRecordingTime()).thenApply((v0) -> {
            return v0.getObject();
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> deleteStreamCutBefore(StreamCutReferenceRecord streamCutReferenceRecord) {
        return getRetentionSetData().thenCompose(versionedMetadata -> {
            RetentionSet retentionSet = (RetentionSet) versionedMetadata.getObject();
            RetentionSet removeStreamCutBefore = RetentionSet.removeStreamCutBefore(retentionSet, streamCutReferenceRecord);
            return Futures.allOf((Collection) retentionSet.retentionRecordsBefore(streamCutReferenceRecord).stream().map(streamCutReferenceRecord2 -> {
                return deleteStreamCutRecordData(streamCutReferenceRecord2.getRecordingTime());
            }).collect(Collectors.toList())).thenCompose(r9 -> {
                return Futures.toVoid(updateRetentionSetData(new VersionedMetadata<>(removeStreamCutBefore, versionedMetadata.getVersion())));
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> startCommittingTransactions(int i) {
        return getVersionedCommitTransactionsRecord().thenCompose(versionedMetadata -> {
            return ((CommittingTransactionsRecord) versionedMetadata.getObject()).equals(CommittingTransactionsRecord.EMPTY) ? getOrderedCommittingTxnInLowestEpoch(i).thenCompose(list -> {
                if (list.isEmpty()) {
                    return CompletableFuture.completedFuture(versionedMetadata);
                }
                Map.Entry entry = (Map.Entry) list.get(0);
                ImmutableList.Builder builder = ImmutableList.builder();
                list.forEach(entry2 -> {
                    builder.add(entry2.getKey());
                });
                List list = (List) list.stream().map(entry3 -> {
                    return Long.valueOf(((ActiveTxnRecord) entry3.getValue()).getCommitOrder());
                }).collect(Collectors.toList());
                CommittingTransactionsRecord committingTransactionsRecord = new CommittingTransactionsRecord(RecordHelper.getTransactionEpoch((UUID) entry.getKey()), builder.build());
                return updateCommittingTxnRecord(new VersionedMetadata<>(committingTransactionsRecord, versionedMetadata.getVersion())).thenCompose(version -> {
                    return removeTxnsFromCommitOrder(list).thenApply(r7 -> {
                        return new VersionedMetadata(committingTransactionsRecord, version);
                    });
                });
            }) : CompletableFuture.completedFuture(versionedMetadata);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> getVersionedCommitTransactionsRecord() {
        return getCommitTxnRecord().thenApply(versionedMetadata -> {
            return new VersionedMetadata(versionedMetadata.getObject(), versionedMetadata.getVersion());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> completeCommittingTransactions(VersionedMetadata<CommittingTransactionsRecord> versionedMetadata) {
        CompletableFuture<Void> generateMarksForTransactions = generateMarksForTransactions(versionedMetadata.getObject());
        UnmodifiableIterator it = versionedMetadata.getObject().getTransactionsToCommit().iterator();
        while (it.hasNext()) {
            UUID uuid = (UUID) it.next();
            log.debug("Committing transaction {} on stream {}/{}", new Object[]{uuid, this.scope, this.name});
            generateMarksForTransactions = generateMarksForTransactions.thenCompose(r6 -> {
                return commitTransaction(uuid).thenAccept(txnStatus -> {
                    log.debug("transaction {} on stream {}/{} committed successfully", new Object[]{uuid, this.scope, this.name});
                });
            });
        }
        return generateMarksForTransactions.thenCompose(r4 -> {
            return getNumberOfOngoingTransactions().thenAccept(num -> {
                TransactionMetrics.reportOpenTransactions(getScope(), getName(), num.intValue());
            });
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r8 -> {
            return Futures.toVoid(updateCommittingTxnRecord(new VersionedMetadata<>(CommittingTransactionsRecord.EMPTY, versionedMetadata.getVersion())));
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> createWaitingRequestIfAbsent(String str) {
        return createWaitingRequestNodeIfAbsent(str);
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<String> getWaitingRequestProcessor() {
        return getWaitingRequestNode().handle((str, th) -> {
            if (th == null) {
                return str;
            }
            if (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException) {
                return null;
            }
            throw new CompletionException(th);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> deleteWaitingRequestConditionally(String str) {
        return getWaitingRequestProcessor().thenCompose(str2 -> {
            return (str2 == null || !str2.equals(str)) ? CompletableFuture.completedFuture(null) : deleteWaitingRequestNode();
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<WriterTimestampResponse> noteWriterMark(String str, long j, Map<Long, Long> map) {
        ImmutableMap copyOf = ImmutableMap.copyOf(map);
        return Futures.exceptionallyExpecting(getWriterMarkRecord(str), AbstractStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, (Object) null).thenCompose(versionedMetadata -> {
            return versionedMetadata == null ? createWriterMarkRecord(str, j, copyOf).exceptionally(th -> {
                if (Exceptions.unwrap(th) instanceof StoreException.DataExistsException) {
                    throw StoreException.create(StoreException.Type.WRITE_CONFLICT, "writer mark exists");
                }
                throw new CompletionException(th);
            }).thenApply(r2 -> {
                return WriterTimestampResponse.SUCCESS;
            }) : ((WriterMark) versionedMetadata.getObject()).getTimestamp() > j ? CompletableFuture.completedFuture(WriterTimestampResponse.INVALID_TIME) : !compareWriterPositions(((WriterMark) versionedMetadata.getObject()).getPosition(), copyOf) ? CompletableFuture.completedFuture(WriterTimestampResponse.INVALID_POSITION) : updateWriterMarkRecord(str, j, copyOf, true, versionedMetadata.getVersion()).thenApply(r22 -> {
                return WriterTimestampResponse.SUCCESS;
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> shutdownWriter(String str) {
        return getWriterMarkRecord(str).thenCompose(versionedMetadata -> {
            return updateWriterMarkRecord(str, ((WriterMark) versionedMetadata.getObject()).getTimestamp(), ((WriterMark) versionedMetadata.getObject()).getPosition(), false, versionedMetadata.getVersion());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> removeWriter(String str, WriterMark writerMark) {
        return Futures.exceptionallyExpecting(getWriterMarkRecord(str), AbstractStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, (Object) null).thenCompose(versionedMetadata -> {
            if (versionedMetadata == null) {
                return CompletableFuture.completedFuture(null);
            }
            if (writerMark.equals(versionedMetadata.getObject())) {
                return removeWriterRecord(str, versionedMetadata.getVersion());
            }
            throw StoreException.create(StoreException.Type.WRITE_CONFLICT, "Writer mark supplied for removal doesn't match stored writer mark");
        });
    }

    @VisibleForTesting
    boolean compareWriterPositions(Map<Long, Long> map, Map<Long, Long> map2) {
        java.util.stream.Stream<Long> stream = map2.keySet().stream();
        map.getClass();
        long longValue = stream.filter((v1) -> {
            return r1.containsKey(v1);
        }).max((v0, v1) -> {
            return Long.compare(v0, v1);
        }).orElse(Long.MIN_VALUE).longValue();
        java.util.stream.Stream<Long> stream2 = map.keySet().stream();
        map2.getClass();
        if (longValue >= stream2.filter((v1) -> {
            return r1.containsKey(v1);
        }).max((v0, v1) -> {
            return Long.compare(v0, v1);
        }).orElse(Long.MIN_VALUE).longValue()) {
            return map2.entrySet().stream().filter(entry -> {
                return map.containsKey(entry.getKey());
            }).allMatch(entry2 -> {
                return ((Long) entry2.getValue()).longValue() >= ((Long) map.get(entry2.getKey())).longValue();
            });
        }
        return false;
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<WriterMark> getWriterMark(String str) {
        return getWriterMarkRecord(str).thenApply((v0) -> {
            return v0.getObject();
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<List<Map.Entry<UUID, ActiveTxnRecord>>> getOrderedCommittingTxnInLowestEpochHelper(ZkOrderedStore zkOrderedStore, int i, Executor executor) {
        return Futures.exceptionallyExpecting(zkOrderedStore.getEntitiesWithPosition(getScope(), getName()), AbstractStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, Collections.emptyMap()).thenCompose(map -> {
            Iterator it = ((Map) map.entrySet().stream().collect(Collectors.groupingBy(entry -> {
                return Integer.valueOf(RecordHelper.getTransactionEpoch(UUID.fromString((String) entry.getValue())));
            }))).entrySet().stream().sorted(Comparator.comparingInt((v0) -> {
                return v0.getKey();
            })).iterator();
            ConcurrentSkipListSet concurrentSkipListSet = new ConcurrentSkipListSet();
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            return Futures.loop(() -> {
                return Boolean.valueOf(it.hasNext() && concurrentHashMap.isEmpty());
            }, () -> {
                return processTransactionsInEpoch((Map.Entry) it.next(), concurrentSkipListSet, concurrentHashMap, i, executor);
            }, executor).thenCompose(r8 -> {
                return zkOrderedStore.removeEntities(getScope(), getName(), concurrentSkipListSet);
            }).thenApply(r4 -> {
                return (List) concurrentHashMap.entrySet().stream().sorted(Comparator.comparing(entry2 -> {
                    return Long.valueOf(((ActiveTxnRecord) entry2.getValue()).getCommitOrder());
                })).collect(Collectors.toList());
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Map<Long, UUID>> getAllOrderedCommittingTxnsHelper(ZkOrderedStore zkOrderedStore) {
        return Futures.exceptionallyExpecting(zkOrderedStore.getEntitiesWithPosition(getScope(), getName()), ZKStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, Collections.emptyMap()).thenApply(map -> {
            return (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry -> {
                return UUID.fromString((String) entry.getValue());
            }));
        });
    }

    private CompletableFuture<Void> processTransactionsInEpoch(Map.Entry<Integer, List<Map.Entry<Long, String>>> entry, ConcurrentSkipListSet<Long> concurrentSkipListSet, ConcurrentHashMap<UUID, ActiveTxnRecord> concurrentHashMap, int i, Executor executor) {
        int intValue = entry.getKey().intValue();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        entry.getValue().forEach(entry2 -> {
            arrayList.add(entry2.getKey());
            arrayList2.add(entry2.getValue());
        });
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(Math.min(i, arrayList2.size()));
        return Futures.loop(() -> {
            return Boolean.valueOf(atomicInteger.get() < arrayList2.size() && concurrentHashMap.size() < i);
        }, () -> {
            return getTransactionRecords(intValue, arrayList2.subList(atomicInteger.get(), atomicInteger2.get())).thenAccept(list -> {
                for (int i2 = 0; i2 < list.size(); i2++) {
                    ActiveTxnRecord activeTxnRecord = (ActiveTxnRecord) list.get(i2);
                    UUID fromString = UUID.fromString((String) arrayList2.get(i2));
                    long longValue = ((Long) arrayList.get(i2)).longValue();
                    switch (AnonymousClass1.$SwitchMap$io$pravega$controller$store$stream$TxnStatus[activeTxnRecord.getTxnStatus().ordinal()]) {
                        case ApiResponseMessage.ERROR /* 1 */:
                            if (activeTxnRecord.getCommitOrder() == longValue) {
                                concurrentHashMap.put(fromString, activeTxnRecord);
                                if (concurrentHashMap.size() >= i) {
                                }
                                break;
                            } else {
                                log.debug("duplicate txn {} at position {}. removing {}", new Object[]{fromString, Long.valueOf(activeTxnRecord.getCommitOrder()), Long.valueOf(longValue)});
                                concurrentSkipListSet.add(Long.valueOf(longValue));
                                break;
                            }
                        case ApiResponseMessage.INFO /* 3 */:
                        case ApiResponseMessage.OK /* 4 */:
                        case ApiResponseMessage.TOO_BUSY /* 5 */:
                        case 6:
                            log.debug("stale txn {} with status. removing {}", new Object[]{fromString, activeTxnRecord.getTxnStatus(), Long.valueOf(longValue)});
                            concurrentSkipListSet.add(Long.valueOf(longValue));
                            break;
                    }
                }
                atomicInteger.set(atomicInteger2.get());
                atomicInteger2.set(Math.min(atomicInteger.get() + i, arrayList2.size()));
            });
        }, executor);
    }

    CompletableFuture<List<ActiveTxnRecord>> getTransactionRecords(int i, List<String> list) {
        return Futures.allOfWithResults((List) list.stream().map(str -> {
            return Futures.exceptionallyExpecting(getActiveTx(i, UUID.fromString(str)).thenApply((v0) -> {
                return v0.getObject();
            }), ZKStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, ActiveTxnRecord.EMPTY);
        }).collect(Collectors.toList()));
    }

    private CompletableFuture<Void> verifyLegalState() {
        return getState(false).thenApply(state -> {
            if (state == null || state.equals(State.UNKNOWN) || state.equals(State.CREATING)) {
                throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + getName() + " State: " + state.name());
            }
            return null;
        });
    }

    private CompletableFuture<Void> createEpochRecord(EpochRecord epochRecord) {
        return createEpochRecordDataIfAbsent(epochRecord.getEpoch(), epochRecord);
    }

    private CompletableFuture<Void> updateCurrentEpochRecord(int i) {
        return getEpochRecord(i).thenCompose(epochRecord -> {
            return getCurrentEpochRecordData(true).thenCompose(versionedMetadata -> {
                return ((EpochRecord) versionedMetadata.getObject()).getEpoch() < i ? Futures.toVoid(updateCurrentEpochRecordData(new VersionedMetadata<>(epochRecord, versionedMetadata.getVersion()))) : CompletableFuture.completedFuture(null);
            });
        });
    }

    private CompletableFuture<Void> createSealedSegmentSizeMapShardIfAbsent(int i) {
        return createSealedSegmentSizesMapShardDataIfAbsent(i, SealedSegmentsMapShard.builder().shardNumber(i).sealedSegmentsSizeMap(Collections.emptyMap()).m179build());
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<SealedSegmentsMapShard> getSealedSegmentSizeMapShard(int i) {
        return getSealedSegmentSizesMapShardData(i).handle((versionedMetadata, th) -> {
            if (th == null) {
                return (SealedSegmentsMapShard) versionedMetadata.getObject();
            }
            if (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException) {
                return SealedSegmentsMapShard.builder().shardNumber(i).sealedSegmentsSizeMap(Collections.emptyMap()).m179build();
            }
            throw new CompletionException(th);
        });
    }

    private CompletableFuture<Void> updateSealedSegmentSizes(Map<Long, Long> map) {
        return Futures.allOf((Collection) ((Map) map.keySet().stream().collect(Collectors.groupingBy((v1) -> {
            return getShardNumber(v1);
        }))).entrySet().stream().map(entry -> {
            int intValue = ((Integer) entry.getKey()).intValue();
            List list = (List) entry.getValue();
            return Futures.exceptionallyComposeExpecting(getSealedSegmentSizesMapShardData(intValue), AbstractStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, () -> {
                return createSealedSegmentSizeMapShardIfAbsent(intValue).thenCompose(r5 -> {
                    return getSealedSegmentSizesMapShardData(intValue);
                });
            }).thenCompose(versionedMetadata -> {
                SealedSegmentsMapShard sealedSegmentsMapShard = (SealedSegmentsMapShard) versionedMetadata.getObject();
                list.forEach(l -> {
                    sealedSegmentsMapShard.addSealedSegmentSize(l.longValue(), ((Long) map.get(l)).longValue());
                });
                return updateSealedSegmentSizesMapShardData(intValue, new VersionedMetadata<>(sealedSegmentsMapShard, versionedMetadata.getVersion()));
            });
        }).collect(Collectors.toList()));
    }

    private int getShardNumber(long j) {
        return NameUtils.getEpoch(j) / this.shardSize.get();
    }

    private ImmutableMap<StreamSegmentRecord, Integer> convertToSpan(EpochRecord epochRecord) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        epochRecord.getSegments().forEach(streamSegmentRecord -> {
            builder.put(streamSegmentRecord, Integer.valueOf(epochRecord.getEpoch()));
        });
        return builder.build();
    }

    private Segment transform(StreamSegmentRecord streamSegmentRecord) {
        return new Segment(streamSegmentRecord.segmentId(), streamSegmentRecord.getCreationTime(), streamSegmentRecord.getKeyStart(), streamSegmentRecord.getKeyEnd());
    }

    private List<Segment> transform(List<StreamSegmentRecord> list) {
        return (List) list.stream().map(this::transform).collect(Collectors.toList());
    }

    @VisibleForTesting
    CompletableFuture<List<EpochRecord>> fetchEpochs(int i, int i2, boolean z) {
        return getActiveEpochRecord(z).thenApply(epochRecord -> {
            return Integer.valueOf(epochRecord.getEpoch() / this.historyChunkSize.get());
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) num -> {
            return Futures.allOfWithResults((List) IntStream.range(i / this.historyChunkSize.get(), (i2 / this.historyChunkSize.get()) + 1).mapToObj(i3 -> {
                return getEpochsFromHistoryChunk(i3, i3 * this.historyChunkSize.get() > i ? i3 * this.historyChunkSize.get() : i, i2, i3 >= num.intValue());
            }).collect(Collectors.toList()));
        }).thenApply(list -> {
            return (List) list.stream().flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList());
        });
    }

    private CompletableFuture<List<EpochRecord>> getEpochsFromHistoryChunk(int i, int i2, int i3, boolean z) {
        return getEpochRecord(i2).thenCompose(epochRecord -> {
            return getHistoryTimeSeriesChunk(i, z).thenCompose(historyTimeSeries -> {
                ArrayList arrayList = new ArrayList();
                arrayList.add(CompletableFuture.completedFuture(epochRecord));
                return Futures.allOfWithResults((List) historyTimeSeries.getHistoryRecords().stream().filter(historyTimeSeriesRecord -> {
                    return historyTimeSeriesRecord.getEpoch() > i2 && historyTimeSeriesRecord.getEpoch() <= i3;
                }).reduce(arrayList, (list, historyTimeSeriesRecord2) -> {
                    CompletableFuture<EpochRecord> newEpochRecord = newEpochRecord((CompletableFuture) list.get(list.size() - 1), historyTimeSeriesRecord2.getEpoch(), historyTimeSeriesRecord2.getReferenceEpoch(), historyTimeSeriesRecord2.getSegmentsCreated(), (Collection) historyTimeSeriesRecord2.getSegmentsSealed().stream().map((v0) -> {
                        return v0.segmentId();
                    }).collect(Collectors.toList()), historyTimeSeriesRecord2.getScaleTime());
                    ArrayList arrayList2 = new ArrayList(list);
                    arrayList2.add(newEpochRecord);
                    return arrayList2;
                }, (list2, list3) -> {
                    ArrayList arrayList2 = new ArrayList(list2);
                    arrayList2.addAll(list3);
                    return arrayList2;
                }));
            });
        });
    }

    private CompletableFuture<EpochRecord> newEpochRecord(CompletableFuture<EpochRecord> completableFuture, int i, int i2, Collection<StreamSegmentRecord> collection, Collection<Long> collection2, long j) {
        return i == i2 ? completableFuture.thenApply(epochRecord -> {
            if (!$assertionsDisabled && epochRecord.getEpoch() != i - 1) {
                throw new AssertionError();
            }
            ImmutableList.Builder builder = ImmutableList.builder();
            epochRecord.getSegments().forEach(streamSegmentRecord -> {
                if (collection2.contains(Long.valueOf(streamSegmentRecord.segmentId()))) {
                    return;
                }
                builder.add(streamSegmentRecord);
            });
            builder.addAll(collection);
            return new EpochRecord(i, i2, builder.build(), j);
        }) : getEpochRecord(i);
    }

    private StreamSegmentRecord newSegmentRecord(long j, long j2, Double d, Double d2) {
        return newSegmentRecord(NameUtils.getEpoch(j), NameUtils.getSegmentNumber(j), j2, d, d2);
    }

    private StreamSegmentRecord newSegmentRecord(int i, int i2, long j, Double d, Double d2) {
        return StreamSegmentRecord.builder().creationEpoch(i).segmentNumber(i2).creationTime(j).keyStart(d.doubleValue()).keyEnd(d2.doubleValue()).m200build();
    }

    @VisibleForTesting
    CompletableFuture<Integer> findEpochAtTime(long j, boolean z) {
        return getActiveEpoch(z).thenCompose(epochRecord -> {
            return searchEpochAtTime(0, epochRecord.getEpoch() / this.historyChunkSize.get(), num -> {
                return num.intValue() == epochRecord.getEpoch() / this.historyChunkSize.get();
            }, j).thenApply(num2 -> {
                if (num2.intValue() != -1) {
                    return num2;
                }
                if (j > epochRecord.getCreationTime()) {
                    return Integer.valueOf(epochRecord.getEpoch());
                }
                return 0;
            });
        });
    }

    private CompletableFuture<Integer> searchEpochAtTime(int i, int i2, Predicate<Integer> predicate, long j) {
        int i3 = (i + i2) / 2;
        return i > i2 ? CompletableFuture.completedFuture(-1) : getHistoryTimeSeriesChunk(i3, predicate.test(Integer.valueOf(i3))).thenCompose(historyTimeSeries -> {
            ImmutableList<HistoryTimeSeriesRecord> historyRecords = historyTimeSeries.getHistoryRecords();
            long scaleTime = ((HistoryTimeSeriesRecord) historyRecords.get(0)).getScaleTime();
            long scaleTime2 = ((HistoryTimeSeriesRecord) historyRecords.get(historyRecords.size() - 1)).getScaleTime();
            if (j < scaleTime || j > scaleTime2) {
                return j < scaleTime ? searchEpochAtTime(i, i3 - 1, predicate, j) : searchEpochAtTime(i3 + 1, i2, predicate, j);
            }
            int findGreatestLowerBound = CollectionHelpers.findGreatestLowerBound(historyRecords, historyTimeSeriesRecord -> {
                return Integer.valueOf(Long.compare(j, historyTimeSeriesRecord.getScaleTime()));
            });
            if ($assertionsDisabled || findGreatestLowerBound >= 0) {
                return CompletableFuture.completedFuture(Integer.valueOf(((HistoryTimeSeriesRecord) historyRecords.get(findGreatestLowerBound)).getEpoch()));
            }
            throw new AssertionError();
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<HistoryTimeSeries> getHistoryTimeSeriesChunk(int i) {
        return getHistoryTimeSeriesChunk(i, true);
    }

    private CompletableFuture<HistoryTimeSeries> getHistoryTimeSeriesChunk(int i, boolean z) {
        return getHistoryTimeSeriesChunkData(i, z).thenCompose(versionedMetadata -> {
            HistoryTimeSeries historyTimeSeries = (HistoryTimeSeries) versionedMetadata.getObject();
            return (z || historyTimeSeries.getHistoryRecords().size() >= this.historyChunkSize.get()) ? CompletableFuture.completedFuture(historyTimeSeries) : getHistoryTimeSeriesChunk(i, true);
        });
    }

    abstract CompletableFuture<CreateStreamResponse> checkStreamExists(StreamConfiguration streamConfiguration, long j, int i);

    abstract CompletableFuture<Void> createStreamMetadata();

    abstract CompletableFuture<Void> storeCreationTimeIfAbsent(long j);

    abstract CompletableFuture<Void> deleteStream();

    abstract CompletableFuture<Void> createConfigurationIfAbsent(StreamConfigurationRecord streamConfigurationRecord);

    abstract CompletableFuture<Version> setConfigurationData(VersionedMetadata<StreamConfigurationRecord> versionedMetadata);

    abstract CompletableFuture<VersionedMetadata<StreamConfigurationRecord>> getConfigurationData(boolean z);

    abstract CompletableFuture<Void> createTruncationDataIfAbsent(StreamTruncationRecord streamTruncationRecord);

    abstract CompletableFuture<Version> setTruncationData(VersionedMetadata<StreamTruncationRecord> versionedMetadata);

    abstract CompletableFuture<VersionedMetadata<StreamTruncationRecord>> getTruncationData(boolean z);

    abstract CompletableFuture<Void> createStateIfAbsent(StateRecord stateRecord);

    abstract CompletableFuture<Version> setStateData(VersionedMetadata<StateRecord> versionedMetadata);

    abstract CompletableFuture<VersionedMetadata<StateRecord>> getStateData(boolean z);

    abstract CompletableFuture<Void> createRetentionSetDataIfAbsent(RetentionSet retentionSet);

    abstract CompletableFuture<Void> createStreamCutRecordData(long j, StreamCutRecord streamCutRecord);

    abstract CompletableFuture<VersionedMetadata<StreamCutRecord>> getStreamCutRecordData(long j);

    abstract CompletableFuture<Void> deleteStreamCutRecordData(long j);

    abstract CompletableFuture<Version> updateRetentionSetData(VersionedMetadata<RetentionSet> versionedMetadata);

    abstract CompletableFuture<VersionedMetadata<RetentionSet>> getRetentionSetData();

    abstract CompletableFuture<Void> createHistoryTimeSeriesChunkDataIfAbsent(int i, HistoryTimeSeries historyTimeSeries);

    abstract CompletableFuture<VersionedMetadata<HistoryTimeSeries>> getHistoryTimeSeriesChunkData(int i, boolean z);

    abstract CompletableFuture<Version> updateHistoryTimeSeriesChunkData(int i, VersionedMetadata<HistoryTimeSeries> versionedMetadata);

    abstract CompletableFuture<Void> createCurrentEpochRecordDataIfAbsent(EpochRecord epochRecord);

    abstract CompletableFuture<Version> updateCurrentEpochRecordData(VersionedMetadata<EpochRecord> versionedMetadata);

    abstract CompletableFuture<VersionedMetadata<EpochRecord>> getCurrentEpochRecordData(boolean z);

    abstract CompletableFuture<Void> createEpochRecordDataIfAbsent(int i, EpochRecord epochRecord);

    abstract CompletableFuture<VersionedMetadata<EpochRecord>> getEpochRecordData(int i);

    abstract CompletableFuture<Void> createSealedSegmentSizesMapShardDataIfAbsent(int i, SealedSegmentsMapShard sealedSegmentsMapShard);

    abstract CompletableFuture<VersionedMetadata<SealedSegmentsMapShard>> getSealedSegmentSizesMapShardData(int i);

    abstract CompletableFuture<Version> updateSealedSegmentSizesMapShardData(int i, VersionedMetadata<SealedSegmentsMapShard> versionedMetadata);

    abstract CompletableFuture<Void> createSegmentSealedEpochRecords(Collection<Long> collection, int i);

    abstract CompletableFuture<VersionedMetadata<Integer>> getSegmentSealedRecordData(long j);

    abstract CompletableFuture<Version> createNewTransaction(int i, UUID uuid, ActiveTxnRecord activeTxnRecord);

    abstract CompletableFuture<VersionedMetadata<ActiveTxnRecord>> getActiveTx(int i, UUID uuid);

    abstract CompletableFuture<Version> updateActiveTx(int i, UUID uuid, VersionedMetadata<ActiveTxnRecord> versionedMetadata);

    abstract CompletableFuture<Long> addTxnToCommitOrder(UUID uuid);

    abstract CompletableFuture<Void> removeTxnsFromCommitOrder(List<Long> list);

    abstract CompletableFuture<VersionedMetadata<CompletedTxnRecord>> getCompletedTx(UUID uuid);

    abstract CompletableFuture<Void> removeActiveTxEntry(int i, UUID uuid);

    abstract CompletableFuture<Void> createCompletedTxEntry(UUID uuid, CompletedTxnRecord completedTxnRecord);

    abstract CompletableFuture<Map<UUID, ActiveTxnRecord>> getTxnInEpoch(int i);

    abstract CompletableFuture<List<Map.Entry<UUID, ActiveTxnRecord>>> getOrderedCommittingTxnInLowestEpoch(int i);

    @VisibleForTesting
    abstract CompletableFuture<Map<Long, UUID>> getAllOrderedCommittingTxns();

    abstract CompletableFuture<Void> createMarkerData(long j, long j2);

    abstract CompletableFuture<Version> updateMarkerData(long j, VersionedMetadata<Long> versionedMetadata);

    abstract CompletableFuture<Void> removeMarkerData(long j);

    abstract CompletableFuture<VersionedMetadata<Long>> getMarkerData(long j);

    abstract CompletableFuture<Void> createEpochTransitionIfAbsent(EpochTransitionRecord epochTransitionRecord);

    abstract CompletableFuture<Version> updateEpochTransitionNode(VersionedMetadata<EpochTransitionRecord> versionedMetadata);

    abstract CompletableFuture<VersionedMetadata<EpochTransitionRecord>> getEpochTransitionNode();

    abstract CompletableFuture<Void> createCommitTxnRecordIfAbsent(CommittingTransactionsRecord committingTransactionsRecord);

    abstract CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> getCommitTxnRecord();

    abstract CompletableFuture<Version> updateCommittingTxnRecord(VersionedMetadata<CommittingTransactionsRecord> versionedMetadata);

    abstract CompletableFuture<Void> createWaitingRequestNodeIfAbsent(String str);

    abstract CompletableFuture<String> getWaitingRequestNode();

    abstract CompletableFuture<Void> deleteWaitingRequestNode();

    abstract CompletableFuture<Void> createWriterMarkRecord(String str, long j, ImmutableMap<Long, Long> immutableMap);

    abstract CompletableFuture<VersionedMetadata<WriterMark>> getWriterMarkRecord(String str);

    abstract CompletableFuture<Void> updateWriterMarkRecord(String str, long j, ImmutableMap<Long, Long> immutableMap, boolean z, Version version);

    abstract CompletableFuture<Void> removeWriterRecord(String str, Version version);

    static {
        $assertionsDisabled = !PersistentStreamBase.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(PersistentStreamBase.class);
    }
}
