package io.pravega.controller.store.stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.BitConverter;
import io.pravega.common.util.CollectionHelpers;
import io.pravega.controller.server.rest.generated.api.ApiResponseMessage;
import io.pravega.controller.store.stream.EpochTransitionOperationExceptions;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.records.ActiveTxnRecord;
import io.pravega.controller.store.stream.records.CommittingTransactionsRecord;
import io.pravega.controller.store.stream.records.CompletedTxnRecord;
import io.pravega.controller.store.stream.records.EpochRecord;
import io.pravega.controller.store.stream.records.EpochTransitionRecord;
import io.pravega.controller.store.stream.records.HistoryTimeSeries;
import io.pravega.controller.store.stream.records.HistoryTimeSeriesRecord;
import io.pravega.controller.store.stream.records.RecordHelper;
import io.pravega.controller.store.stream.records.RetentionSet;
import io.pravega.controller.store.stream.records.SealedSegmentsMapShard;
import io.pravega.controller.store.stream.records.StateRecord;
import io.pravega.controller.store.stream.records.StreamConfigurationRecord;
import io.pravega.controller.store.stream.records.StreamCutRecord;
import io.pravega.controller.store.stream.records.StreamCutReferenceRecord;
import io.pravega.controller.store.stream.records.StreamSegmentRecord;
import io.pravega.controller.store.stream.records.StreamTruncationRecord;
import io.pravega.shared.segment.StreamSegmentNameUtils;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/store/stream/PersistentStreamBase.class */
public abstract class PersistentStreamBase implements Stream {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log;
    private final String scope;
    private final String name;
    private final AtomicInteger historyChunkSize;
    private final AtomicInteger shardSize;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* renamed from: io.pravega.controller.store.stream.PersistentStreamBase$1, reason: invalid class name */
    /* loaded from: input_file:io/pravega/controller/store/stream/PersistentStreamBase$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$pravega$controller$store$stream$TxnStatus = new int[TxnStatus.values().length];

        static {
            try {
                $SwitchMap$io$pravega$controller$store$stream$TxnStatus[TxnStatus.ABORTING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$TxnStatus[TxnStatus.ABORTED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$TxnStatus[TxnStatus.OPEN.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$TxnStatus[TxnStatus.COMMITTING.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$TxnStatus[TxnStatus.COMMITTED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$TxnStatus[TxnStatus.UNKNOWN.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PersistentStreamBase(String str, String str2, int i, int i2) {
        this.scope = str;
        this.name = str2;
        this.historyChunkSize = new AtomicInteger(i);
        this.shardSize = new AtomicInteger(i2);
    }

    @Override // io.pravega.controller.store.stream.Stream
    public String getScope() {
        return this.scope;
    }

    @Override // io.pravega.controller.store.stream.Stream
    public String getName() {
        return this.name;
    }

    @Override // io.pravega.controller.store.stream.Stream
    public String getScopeName() {
        return this.scope;
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<CreateStreamResponse> create(StreamConfiguration streamConfiguration, long j, int i) {
        return checkScopeExists().thenCompose(r11 -> {
            return checkStreamExists(streamConfiguration, j, i);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) createStreamResponse -> {
            return storeCreationTimeIfAbsent(createStreamResponse.getTimestamp()).thenCompose(r5 -> {
                return createConfigurationIfAbsent(StreamConfigurationRecord.complete(createStreamResponse.getConfiguration()).toBytes());
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r4 -> {
                return createEpochTransitionIfAbsent(EpochTransitionRecord.EMPTY.toBytes());
            }).thenCompose(r42 -> {
                return createTruncationDataIfAbsent(StreamTruncationRecord.EMPTY.toBytes());
            }).thenCompose(r43 -> {
                return createCommitTxnRecordIfAbsent(CommittingTransactionsRecord.EMPTY.toBytes());
            }).thenCompose(r52 -> {
                return createStateIfAbsent(StateRecord.builder().state(State.CREATING).m122build().toBytes());
            }).thenCompose(r7 -> {
                return createHistoryRecords(i, createStreamResponse);
            }).thenApply(r3 -> {
                return createStreamResponse;
            });
        });
    }

    private CompletionStage<Void> createHistoryRecords(int i, CreateStreamResponse createStreamResponse) {
        int minNumSegments = createStreamResponse.getConfiguration().getScalingPolicy().getMinNumSegments();
        double d = 1.0d / minNumSegments;
        long timestamp = createStreamResponse.getTimestamp();
        EpochRecord m100build = EpochRecord.builder().epoch(0).referenceEpoch(0).segments((List) IntStream.range(0, minNumSegments).boxed().map(num -> {
            return newSegmentRecord(0, i + num.intValue(), timestamp, Double.valueOf(num.intValue() * d), Double.valueOf((num.intValue() + 1) * d));
        }).collect(Collectors.toList())).creationTime(timestamp).m100build();
        return createEpochRecord(m100build).thenCompose(r5 -> {
            return createHistoryChunk(m100build);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r4 -> {
            return createSealedSegmentSizeMapShardIfAbsent(0);
        }).thenCompose(r52 -> {
            return createRetentionSetDataIfAbsent(RetentionSet.builder().retentionRecords(Collections.emptyList()).m113build().toBytes());
        }).thenCompose(r53 -> {
            return createCurrentEpochRecordDataIfAbsent(m100build.toBytes());
        });
    }

    private CompletionStage<Void> createHistoryChunk(EpochRecord epochRecord) {
        return createHistoryTimeSeriesChunk(0, HistoryTimeSeriesRecord.builder().epoch(0).referenceEpoch(0).segmentsCreated(epochRecord.getSegments()).segmentsSealed(Collections.emptyList()).creationTime(epochRecord.getCreationTime()).m109build());
    }

    private CompletableFuture<Void> createHistoryTimeSeriesChunk(int i, HistoryTimeSeriesRecord historyTimeSeriesRecord) {
        return createHistoryTimeSeriesChunkDataIfAbsent(i, HistoryTimeSeries.builder().historyRecords(Lists.newArrayList(new HistoryTimeSeriesRecord[]{historyTimeSeriesRecord})).m106build().toBytes());
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> delete() {
        return deleteStream();
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> startTruncation(Map<Long, Long> map) {
        return getTruncationRecord().thenCompose(versionedMetadata -> {
            Preconditions.checkNotNull(versionedMetadata);
            Preconditions.checkArgument(!((StreamTruncationRecord) versionedMetadata.getObject()).isUpdating());
            return isStreamCutValid(map).thenCompose(bool -> {
                Exceptions.checkArgument(bool.booleanValue(), "streamCut", "invalid stream cut", new Object[0]);
                return computeStreamCutSpan(map).thenCompose(map2 -> {
                    StreamTruncationRecord streamTruncationRecord = (StreamTruncationRecord) versionedMetadata.getObject();
                    Exceptions.checkArgument(greaterThan(map, map2, streamTruncationRecord.getStreamCut(), streamTruncationRecord.getSpan()), "StreamCut", "Supplied streamcut is behind previous truncation point", new Object[0]);
                    return computeTruncationRecord(streamTruncationRecord, map, map2).thenCompose(streamTruncationRecord2 -> {
                        return Futures.toVoid(setTruncationData(new Data(streamTruncationRecord2.toBytes(), versionedMetadata.getVersion())));
                    });
                });
            });
        });
    }

    private boolean greaterThan(Map<Long, Long> map, Map<StreamSegmentRecord, Integer> map2, Map<Long, Long> map3, Map<StreamSegmentRecord, Integer> map4) {
        return map2.entrySet().stream().allMatch(entry -> {
            return map4.entrySet().stream().noneMatch(entry -> {
                return (((StreamSegmentRecord) entry.getKey()).segmentId() == ((StreamSegmentRecord) entry.getKey()).segmentId() && ((Long) map.get(Long.valueOf(((StreamSegmentRecord) entry.getKey()).segmentId()))).longValue() < ((Long) map3.get(Long.valueOf(((StreamSegmentRecord) entry.getKey()).segmentId()))).longValue()) || (((StreamSegmentRecord) entry.getKey()).overlaps((StreamSegmentRecord) entry.getKey()) && ((Integer) entry.getValue()).intValue() < ((Integer) entry.getValue()).intValue());
            });
        });
    }

    private CompletableFuture<StreamTruncationRecord> computeTruncationRecord(StreamTruncationRecord streamTruncationRecord, Map<Long, Long> map, Map<StreamSegmentRecord, Integer> map2) {
        log.debug("computing truncation for stream {}/{}", this.scope, this.name);
        return (streamTruncationRecord.getSpan().isEmpty() ? getEpochRecord(0).thenApply(epochRecord -> {
            return convertToSpan(epochRecord);
        }) : CompletableFuture.completedFuture(streamTruncationRecord.getSpan())).thenCompose(map3 -> {
            return segmentsBetweenStreamCutSpans(map3, map2);
        }).thenCompose(set -> {
            return sizeBetweenStreamCuts(streamTruncationRecord.getStreamCut(), map, set).thenApply(l -> {
                return new StreamTruncationRecord(map, map2, streamTruncationRecord.getDeletedSegments(), (Set) set.stream().map((v0) -> {
                    return v0.segmentId();
                }).filter(l -> {
                    return !map.containsKey(l);
                }).collect(Collectors.toSet()), streamTruncationRecord.getSizeTill() + l.longValue(), true);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> completeTruncation(VersionedMetadata<StreamTruncationRecord> versionedMetadata) {
        Preconditions.checkNotNull(versionedMetadata);
        Preconditions.checkArgument(versionedMetadata.getObject().isUpdating());
        StreamTruncationRecord object = versionedMetadata.getObject();
        return object.isUpdating() ? Futures.toVoid(setTruncationData(new Data(StreamTruncationRecord.complete(object).toBytes(), versionedMetadata.getVersion()))) : CompletableFuture.completedFuture(null);
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedMetadata<StreamTruncationRecord>> getTruncationRecord() {
        return getTruncationData(true).thenApply(data -> {
            return new VersionedMetadata(StreamTruncationRecord.fromBytes(data.getData()), data.getVersion());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> startUpdateConfiguration(StreamConfiguration streamConfiguration) {
        return getVersionedConfigurationRecord().thenCompose(versionedMetadata -> {
            Preconditions.checkArgument(!((StreamConfigurationRecord) versionedMetadata.getObject()).isUpdating());
            return Futures.toVoid(setConfigurationData(new Data(StreamConfigurationRecord.update(streamConfiguration).toBytes(), versionedMetadata.getVersion())));
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> completeUpdateConfiguration(VersionedMetadata<StreamConfigurationRecord> versionedMetadata) {
        StreamConfigurationRecord object = versionedMetadata.getObject();
        Preconditions.checkNotNull(object);
        if (!object.isUpdating()) {
            return CompletableFuture.completedFuture(null);
        }
        StreamConfigurationRecord complete = StreamConfigurationRecord.complete(object.getStreamConfiguration());
        log.debug("Completing update configuration for stream {}/{}", this.scope, this.name);
        return Futures.toVoid(setConfigurationData(new Data(complete.toBytes(), versionedMetadata.getVersion())));
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<StreamConfiguration> getConfiguration() {
        return getConfigurationData(false).thenApply(data -> {
            return StreamConfigurationRecord.fromBytes(data.getData()).getStreamConfiguration();
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedMetadata<StreamConfigurationRecord>> getVersionedConfigurationRecord() {
        return getConfigurationData(true).thenApply(data -> {
            return new VersionedMetadata(StreamConfigurationRecord.fromBytes(data.getData()), data.getVersion());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> updateState(State state) {
        return getStateData(true).thenCompose(data -> {
            return Futures.toVoid(updateVersionedState(new VersionedMetadata<>(StateRecord.fromBytes(data.getData()).getState(), data.getVersion()), state));
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedMetadata<State>> getVersionedState() {
        return getStateData(true).thenApply(data -> {
            return new VersionedMetadata(StateRecord.fromBytes(data.getData()).getState(), data.getVersion());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedMetadata<State>> updateVersionedState(VersionedMetadata<State> versionedMetadata, State state) {
        return State.isTransitionAllowed(versionedMetadata.getObject(), state) ? setStateData(new Data(StateRecord.builder().state(state).m122build().toBytes(), versionedMetadata.getVersion())).thenApply(version -> {
            return new VersionedMetadata(state, version);
        }) : Futures.failedFuture(StoreException.create(StoreException.Type.OPERATION_NOT_ALLOWED, "Stream: " + getName() + " State: " + state.name() + " current state = " + versionedMetadata.getObject()));
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<State> getState(boolean z) {
        return getStateData(z).thenApply(data -> {
            return StateRecord.fromBytes(data.getData()).getState();
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Segment> getSegment(long j) {
        return getEpochRecord(StreamSegmentNameUtils.getEpoch(j)).thenApply(epochRecord -> {
            return transform(epochRecord.getSegments().stream().filter(streamSegmentRecord -> {
                return streamSegmentRecord.segmentId() == j;
            }).findAny().orElseThrow(() -> {
                return StoreException.create(StoreException.Type.DATA_NOT_FOUND, "segment not found in epoch");
            }));
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<List<ScaleMetadata>> getScaleMetadata(long j, long j2) {
        CompletableFuture<Integer> findEpochAtTime = findEpochAtTime(j, false);
        CompletableFuture<Integer> findEpochAtTime2 = findEpochAtTime(j2, false);
        return CompletableFuture.allOf(findEpochAtTime, findEpochAtTime2).thenCompose(r8 -> {
            return fetchEpochs(((Integer) findEpochAtTime.join()).intValue(), ((Integer) findEpochAtTime2.join()).intValue(), false);
        }).thenApply((Function<? super U, ? extends U>) this::mapToScaleMetadata);
    }

    private List<ScaleMetadata> mapToScaleMetadata(List<EpochRecord> list) {
        AtomicReference atomicReference = new AtomicReference();
        return (List) list.stream().map(epochRecord -> {
            long j = 0;
            long j2 = 0;
            List<StreamSegmentRecord> segments = epochRecord.getSegments();
            if (atomicReference.get() != null) {
                j = findSegmentSplitsMerges((List) atomicReference.get(), segments);
                j2 = findSegmentSplitsMerges(segments, (List) atomicReference.get());
            }
            atomicReference.set(segments);
            return new ScaleMetadata(epochRecord.getCreationTime(), transform(segments), j, j2);
        }).collect(Collectors.toList());
    }

    private long findSegmentSplitsMerges(List<StreamSegmentRecord> list, List<StreamSegmentRecord> list2) {
        return list.stream().filter(streamSegmentRecord -> {
            return list2.stream().filter(streamSegmentRecord -> {
                return streamSegmentRecord.overlaps(streamSegmentRecord);
            }).count() > 1;
        }).count();
    }

    private CompletableFuture<Integer> getSegmentSealedEpoch(long j) {
        return getSegmentSealedRecordData(j).handle((data, th) -> {
            if (th == null) {
                return Integer.valueOf(BitConverter.readInt(data.getData(), 0));
            }
            if (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException) {
                return -1;
            }
            throw new CompletionException(th);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Set<Long>> getAllSegmentIds() {
        CompletableFuture thenCompose = getTruncationRecord().thenCompose(versionedMetadata -> {
            return ((StreamTruncationRecord) versionedMetadata.getObject()).equals(StreamTruncationRecord.EMPTY) ? getEpochRecord(0).thenApply(this::convertToSpan) : CompletableFuture.completedFuture(((StreamTruncationRecord) versionedMetadata.getObject()).getSpan());
        });
        CompletableFuture thenApply = getActiveEpoch(true).thenApply(this::convertToSpan);
        return CompletableFuture.allOf(thenCompose, thenApply).thenCompose(r7 -> {
            return segmentsBetweenStreamCutSpans((Map) thenCompose.join(), (Map) thenApply.join()).thenApply(set -> {
                return (Set) set.stream().map((v0) -> {
                    return v0.segmentId();
                }).collect(Collectors.toSet());
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Map<Segment, List<Long>>> getSuccessorsWithPredecessors(long j) {
        return getSegmentSealedEpoch(j).thenCompose(num -> {
            if (num.intValue() < 0) {
                return getActiveEpoch(true).thenApply(epochRecord -> {
                    return Collections.emptyMap();
                });
            }
            CompletableFuture<EpochRecord> epochRecord2 = getEpochRecord(num.intValue());
            CompletableFuture<EpochRecord> epochRecord3 = getEpochRecord(num.intValue() - 1);
            return CompletableFuture.allOf(epochRecord2, epochRecord3).thenApply(r9 -> {
                EpochRecord epochRecord4 = (EpochRecord) epochRecord2.join();
                EpochRecord epochRecord5 = (EpochRecord) epochRecord3.join();
                Optional<StreamSegmentRecord> findAny = epochRecord5.getSegments().stream().filter(streamSegmentRecord -> {
                    return streamSegmentRecord.segmentId() == j;
                }).findAny();
                if (!$assertionsDisabled && !findAny.isPresent()) {
                    throw new AssertionError();
                }
                StreamSegmentRecord streamSegmentRecord2 = findAny.get();
                return (Map) ((List) epochRecord4.getSegments().stream().filter(streamSegmentRecord3 -> {
                    return streamSegmentRecord3.overlaps(streamSegmentRecord2);
                }).collect(Collectors.toList())).stream().collect(Collectors.toMap(this::transform, streamSegmentRecord4 -> {
                    return (List) epochRecord5.getSegments().stream().filter(streamSegmentRecord4 -> {
                        return streamSegmentRecord4.overlaps(streamSegmentRecord4);
                    }).map((v0) -> {
                        return v0.segmentId();
                    }).collect(Collectors.toList());
                }));
            });
        });
    }

    private CompletableFuture<EpochRecord> getActiveEpochRecord(boolean z) {
        return getCurrentEpochRecordData(z).thenApply(data -> {
            return EpochRecord.fromBytes(data.getData());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<List<Segment>> getActiveSegments() {
        return verifyLegalState().thenCompose(r4 -> {
            return getActiveEpochRecord(true).thenApply(epochRecord -> {
                return transform(epochRecord.getSegments());
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Map<Segment, Long>> getSegmentsAtHead() {
        return getTruncationRecord().thenCompose(versionedMetadata -> {
            return ((StreamTruncationRecord) versionedMetadata.getObject()).equals(StreamTruncationRecord.EMPTY) ? getSegmentsInEpoch(0).thenApply(list -> {
                return (Map) list.stream().collect(Collectors.toMap(segment -> {
                    return segment;
                }, segment2 -> {
                    return 0L;
                }));
            }) : CompletableFuture.completedFuture(((StreamTruncationRecord) versionedMetadata.getObject()).getStreamCut().entrySet().stream().collect(Collectors.toMap(entry -> {
                return transform(((StreamTruncationRecord) versionedMetadata.getObject()).getSpan().keySet().stream().filter(streamSegmentRecord -> {
                    return streamSegmentRecord.segmentId() == ((Long) entry.getKey()).longValue();
                }).findFirst().get());
            }, (v0) -> {
                return v0.getValue();
            })));
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<List<Segment>> getSegmentsInEpoch(int i) {
        return getEpochRecord(i).thenApply(epochRecord -> {
            return transform(epochRecord.getSegments());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<List<Segment>> getSegmentsBetweenStreamCuts(Map<Long, Long> map, Map<Long, Long> map2) {
        return segmentsBetweenStreamCuts(map, map2).thenApply(set -> {
            return new LinkedList(transform((Set<StreamSegmentRecord>) set));
        });
    }

    private CompletableFuture<Set<StreamSegmentRecord>> segmentsBetweenStreamCuts(Map<Long, Long> map, Map<Long, Long> map2) {
        CompletableFuture thenApply = map.isEmpty() ? getEpochRecord(0).thenApply(this::convertToSpan) : computeStreamCutSpan(map);
        CompletableFuture thenApply2 = map2.isEmpty() ? getActiveEpochRecord(true).thenApply(this::convertToSpan) : computeStreamCutSpan(map2);
        return CompletableFuture.allOf(thenApply, thenApply2).thenCompose(r10 -> {
            if (!map.isEmpty() && !map2.isEmpty()) {
                Preconditions.checkArgument(RecordHelper.streamCutComparator(map2, (Map) thenApply2.join(), map, (Map) thenApply.join()));
            }
            return segmentsBetweenStreamCutSpans((Map) thenApply.join(), (Map) thenApply2.join());
        });
    }

    @VisibleForTesting
    CompletableFuture<Set<StreamSegmentRecord>> segmentsBetweenStreamCutSpans(Map<StreamSegmentRecord, Integer> map, Map<StreamSegmentRecord, Integer> map2) {
        int intValue = ((Integer) Collections.min(map2.values())).intValue();
        int intValue2 = ((Integer) Collections.max(map2.values())).intValue();
        int intValue3 = ((Integer) Collections.min(map.values())).intValue();
        int intValue4 = ((Integer) Collections.max(map.values())).intValue();
        HashSet hashSet = new HashSet();
        return fetchEpochs(intValue3, intValue2, true).thenAccept(list -> {
            list.forEach(epochRecord -> {
                if (epochRecord.getEpoch() < intValue4 || epochRecord.getEpoch() > intValue) {
                    epochRecord.getSegments().stream().filter(streamSegmentRecord -> {
                        return !hashSet.contains(streamSegmentRecord);
                    }).forEach(streamSegmentRecord2 -> {
                        boolean allMatch = map.keySet().stream().filter(streamSegmentRecord2 -> {
                            return streamSegmentRecord2.overlaps(streamSegmentRecord2);
                        }).allMatch(streamSegmentRecord3 -> {
                            return streamSegmentRecord3.segmentId() <= streamSegmentRecord2.segmentId();
                        });
                        boolean allMatch2 = map2.keySet().stream().filter(streamSegmentRecord4 -> {
                            return streamSegmentRecord4.overlaps(streamSegmentRecord2);
                        }).allMatch(streamSegmentRecord5 -> {
                            return streamSegmentRecord2.segmentId() <= streamSegmentRecord5.segmentId();
                        });
                        if (allMatch && allMatch2) {
                            hashSet.add(streamSegmentRecord2);
                        }
                    });
                } else {
                    hashSet.addAll(epochRecord.getSegments());
                }
            });
        }).thenApply(r3 -> {
            return hashSet;
        });
    }

    @VisibleForTesting
    CompletableFuture<Long> sizeBetweenStreamCuts(Map<Long, Long> map, Map<Long, Long> map2, Set<StreamSegmentRecord> set) {
        return Futures.allOfWithResults((List) ((Map) set.stream().collect(Collectors.groupingBy(streamSegmentRecord -> {
            return Integer.valueOf(getShardNumber(streamSegmentRecord.segmentId()));
        }))).entrySet().stream().map(entry -> {
            return getSealedSegmentSizeMapShard(((Integer) entry.getKey()).intValue()).thenApply(sealedSegmentsMapShard -> {
                return (Map) ((List) entry.getValue()).stream().collect(Collectors.toMap(streamSegmentRecord2 -> {
                    return streamSegmentRecord2;
                }, streamSegmentRecord3 -> {
                    if (sealedSegmentsMapShard.getSize(streamSegmentRecord3.segmentId()) == null) {
                        return Long.MIN_VALUE;
                    }
                    return sealedSegmentsMapShard.getSize(streamSegmentRecord3.segmentId());
                }));
            });
        }).collect(Collectors.toList())).thenApply(list -> {
            return (Map) list.stream().flatMap(map3 -> {
                return map3.entrySet().stream();
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }));
        }).thenApply(map3 -> {
            AtomicLong atomicLong = new AtomicLong(0L);
            map3.forEach((streamSegmentRecord2, l) -> {
                if (map2.containsKey(Long.valueOf(streamSegmentRecord2.segmentId())) && map.containsKey(Long.valueOf(streamSegmentRecord2.segmentId()))) {
                    atomicLong.addAndGet(((Long) map2.get(Long.valueOf(streamSegmentRecord2.segmentId()))).longValue() - ((Long) map.get(Long.valueOf(streamSegmentRecord2.segmentId()))).longValue());
                    return;
                }
                if (map2.containsKey(Long.valueOf(streamSegmentRecord2.segmentId()))) {
                    atomicLong.addAndGet(((Long) map2.get(Long.valueOf(streamSegmentRecord2.segmentId()))).longValue());
                    return;
                }
                if (map.containsKey(Long.valueOf(streamSegmentRecord2.segmentId()))) {
                    if (!$assertionsDisabled && l.longValue() < 0) {
                        throw new AssertionError();
                    }
                    atomicLong.addAndGet(l.longValue() - ((Long) map.get(Long.valueOf(streamSegmentRecord2.segmentId()))).longValue());
                    return;
                }
                if (!$assertionsDisabled && l.longValue() < 0) {
                    throw new AssertionError();
                }
                atomicLong.addAndGet(l.longValue());
            });
            return Long.valueOf(atomicLong.get());
        });
    }

    @VisibleForTesting
    CompletableFuture<Map<StreamSegmentRecord, Integer>> computeStreamCutSpan(Map<Long, Long> map) {
        long longValue = map.keySet().stream().max(Comparator.naturalOrder()).get().longValue();
        int epoch = StreamSegmentNameUtils.getEpoch(map.keySet().stream().min(Comparator.naturalOrder()).get().longValue());
        int epoch2 = StreamSegmentNameUtils.getEpoch(longValue);
        return fetchEpochs(epoch, epoch2, true).thenApply(list -> {
            ArrayList arrayList = new ArrayList(map.keySet());
            HashMap hashMap = new HashMap();
            for (int i = epoch2 - epoch; i >= 0 && !arrayList.isEmpty(); i--) {
                EpochRecord epochRecord = (EpochRecord) list.get(i);
                Set<Long> segmentIds = epochRecord.getSegmentIds();
                java.util.stream.Stream stream = arrayList.stream();
                segmentIds.getClass();
                hashMap.putAll((Map) ((List) stream.filter((v1) -> {
                    return r1.contains(v1);
                }).collect(Collectors.toList())).stream().collect(Collectors.toMap(l -> {
                    return epochRecord.getSegments().stream().filter(streamSegmentRecord -> {
                        return streamSegmentRecord.segmentId() == l.longValue();
                    }).findFirst().get();
                }, l2 -> {
                    return Integer.valueOf(epochRecord.getEpoch());
                })));
                arrayList.removeAll(segmentIds);
            }
            return hashMap;
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Boolean> isStreamCutValid(Map<Long, Long> map) {
        return Futures.allOfWithResults((List) map.keySet().stream().map(l -> {
            return getSegment(l.longValue()).thenApply(segment -> {
                return new AbstractMap.SimpleEntry(Double.valueOf(segment.getKeyStart()), Double.valueOf(segment.getKeyEnd()));
            });
        }).collect(Collectors.toList())).thenAccept(list -> {
            RecordHelper.validateStreamCut(new ArrayList(list));
        }).handle((r6, th) -> {
            if (th == null) {
                return true;
            }
            if (Exceptions.unwrap(th) instanceof IllegalArgumentException) {
                return false;
            }
            log.warn("Exception while trying to validate a stream cut for stream {}/{}", this.scope, this.name);
            throw th;
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> submitScale(List<Long> list, List<Map.Entry<Double, Double>> list2, long j, VersionedMetadata<EpochTransitionRecord> versionedMetadata) {
        return verifyNotSealed().thenCompose(r4 -> {
            return versionedMetadata == null ? getEpochTransition() : CompletableFuture.completedFuture(versionedMetadata);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) versionedMetadata2 -> {
            return getActiveEpochRecord(true).thenCompose(epochRecord -> {
                if (!((EpochTransitionRecord) versionedMetadata2.getObject()).equals(EpochTransitionRecord.EMPTY)) {
                    if (RecordHelper.verifyRecordMatchesInput(list, list2, false, (EpochTransitionRecord) versionedMetadata2.getObject())) {
                        return CompletableFuture.completedFuture(versionedMetadata2);
                    }
                    log.debug("scale conflict, another scale operation is ongoing");
                    throw new EpochTransitionOperationExceptions.ConflictException();
                }
                if (!RecordHelper.canScaleFor(list, epochRecord)) {
                    return updateEpochTransitionNode(new Data(EpochTransitionRecord.EMPTY.toBytes(), versionedMetadata2.getVersion())).thenApply(version -> {
                        log.warn("scale precondition failed {}", list);
                        throw new EpochTransitionOperationExceptions.PreConditionFailureException();
                    });
                }
                if (RecordHelper.validateInputRange(list, list2, epochRecord)) {
                    EpochTransitionRecord computeEpochTransition = RecordHelper.computeEpochTransition(epochRecord, list, list2, j);
                    return updateEpochTransitionNode(new Data(computeEpochTransition.toBytes(), versionedMetadata2.getVersion())).thenApply(version2 -> {
                        log.info("scale for stream {}/{} accepted. Segments to seal = {}", new Object[]{this.scope, this.name, computeEpochTransition.getSegmentsToSeal()});
                        return new VersionedMetadata(computeEpochTransition, version2);
                    });
                }
                log.error("scale input invalid {} {}", list, list2);
                throw new EpochTransitionOperationExceptions.InputInvalidException();
            });
        });
    }

    private CompletableFuture<Void> verifyNotSealed() {
        return getState(false).thenAccept(state -> {
            if (state.equals(State.SEALING) || state.equals(State.SEALED)) {
                throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + getName() + " State: " + state.name());
            }
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> startScale(boolean z, VersionedMetadata<EpochTransitionRecord> versionedMetadata, VersionedMetadata<State> versionedMetadata2) {
        Preconditions.checkArgument(versionedMetadata2.getObject().equals(State.SCALING));
        return getCurrentEpochRecordData(true).thenCompose(data -> {
            EpochRecord fromBytes = EpochRecord.fromBytes(data.getData());
            return z ? migrateManualScaleToNewEpoch(versionedMetadata, versionedMetadata2, fromBytes) : discardInconsistentEpochTransition(versionedMetadata, versionedMetadata2, fromBytes);
        });
    }

    private CompletableFuture<VersionedMetadata<EpochTransitionRecord>> discardInconsistentEpochTransition(VersionedMetadata<EpochTransitionRecord> versionedMetadata, VersionedMetadata<State> versionedMetadata2, EpochRecord epochRecord) {
        return versionedMetadata.getObject().getNewEpoch() > epochRecord.getEpoch() ? CompletableFuture.completedFuture(versionedMetadata) : updateEpochTransitionNode(new Data(EpochTransitionRecord.EMPTY.toBytes(), versionedMetadata.getVersion())).thenCompose(version -> {
            return updateVersionedState(versionedMetadata2, State.ACTIVE);
        }).thenApply((Function<? super U, ? extends U>) versionedMetadata3 -> {
            log.warn("Scale epoch transition record is inconsistent with data in the table. {}", Integer.valueOf(((EpochTransitionRecord) versionedMetadata.getObject()).getNewEpoch()));
            throw new IllegalStateException("Epoch transition record is inconsistent.");
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> scaleCreateNewEpoch(VersionedMetadata<EpochTransitionRecord> versionedMetadata) {
        return getActiveEpochRecord(true).thenCompose(epochRecord -> {
            if (epochRecord.getEpoch() >= ((EpochTransitionRecord) versionedMetadata.getObject()).getNewEpoch()) {
                return CompletableFuture.completedFuture(versionedMetadata);
            }
            EpochTransitionRecord epochTransitionRecord = (EpochTransitionRecord) versionedMetadata.getObject();
            long max = Math.max(epochTransitionRecord.getTime(), epochRecord.getCreationTime() + 1);
            List<StreamSegmentRecord> list = (List) epochTransitionRecord.getNewSegmentsWithRange().entrySet().stream().map(entry -> {
                return newSegmentRecord(((Long) entry.getKey()).longValue(), max, (Double) ((Map.Entry) entry.getValue()).getKey(), (Double) ((Map.Entry) entry.getValue()).getValue());
            }).collect(Collectors.toList());
            java.util.stream.Stream<Long> stream = epochTransitionRecord.getSegmentsToSeal().stream();
            epochRecord.getClass();
            List<StreamSegmentRecord> list2 = (List) stream.map((v1) -> {
                return r1.getSegment(v1);
            }).collect(Collectors.toList());
            LinkedList linkedList = new LinkedList(epochRecord.getSegments());
            linkedList.removeIf(streamSegmentRecord -> {
                return epochTransitionRecord.getSegmentsToSeal().contains(Long.valueOf(streamSegmentRecord.segmentId()));
            });
            linkedList.addAll(list);
            EpochRecord m100build = EpochRecord.builder().epoch(epochTransitionRecord.getNewEpoch()).referenceEpoch(epochTransitionRecord.getNewEpoch()).segments(linkedList).creationTime(max).m100build();
            HistoryTimeSeriesRecord m109build = HistoryTimeSeriesRecord.builder().epoch(epochTransitionRecord.getNewEpoch()).referenceEpoch(epochTransitionRecord.getNewEpoch()).segmentsCreated(list).segmentsSealed(list2).creationTime(m100build.getCreationTime()).m109build();
            return createEpochRecord(m100build).thenCompose(r5 -> {
                return updateHistoryTimeSeries(m109build);
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r6 -> {
                return Futures.allOf((Collection) epochTransitionRecord.getSegmentsToSeal().stream().map(l -> {
                    return recordSegmentSealedEpoch(l.longValue(), epochTransitionRecord.getNewEpoch());
                }).collect(Collectors.toList()));
            }).thenApply(r3 -> {
                return versionedMetadata;
            });
        });
    }

    private CompletableFuture<Void> recordSegmentSealedEpoch(long j, int i) {
        return createSegmentSealedEpochRecordData(j, i);
    }

    private CompletableFuture<Void> updateHistoryTimeSeries(HistoryTimeSeriesRecord historyTimeSeriesRecord) {
        int epoch = historyTimeSeriesRecord.getEpoch() / this.historyChunkSize.get();
        return historyTimeSeriesRecord.getEpoch() % this.historyChunkSize.get() == 0 ? createHistoryTimeSeriesChunk(epoch, historyTimeSeriesRecord) : getHistoryTimeSeriesChunkData(epoch, true).thenCompose(data -> {
            HistoryTimeSeries fromBytes = HistoryTimeSeries.fromBytes(data.getData());
            return fromBytes.getLatestRecord().getEpoch() < historyTimeSeriesRecord.getEpoch() ? Futures.toVoid(updateHistoryTimeSeriesChunkData(epoch, new Data(HistoryTimeSeries.addHistoryRecord(fromBytes, historyTimeSeriesRecord).toBytes(), data.getVersion()))) : CompletableFuture.completedFuture(null);
        });
    }

    private CompletableFuture<VersionedMetadata<EpochTransitionRecord>> migrateManualScaleToNewEpoch(VersionedMetadata<EpochTransitionRecord> versionedMetadata, VersionedMetadata<State> versionedMetadata2, EpochRecord epochRecord) {
        EpochTransitionRecord object = versionedMetadata.getObject();
        return getEpochRecord(object.getActiveEpoch()).thenCompose(epochRecord2 -> {
            if (object.getActiveEpoch() == epochRecord.getEpoch()) {
                return CompletableFuture.completedFuture(versionedMetadata);
            }
            if (epochRecord.getEpoch() <= object.getActiveEpoch() || epochRecord.getReferenceEpoch() != epochRecord2.getReferenceEpoch()) {
                return updateEpochTransitionNode(new Data(EpochTransitionRecord.EMPTY.toBytes(), versionedMetadata.getVersion())).thenCompose(version -> {
                    return updateVersionedState(versionedMetadata2, State.ACTIVE);
                }).thenApply((Function<? super U, ? extends U>) versionedMetadata3 -> {
                    log.warn("Scale epoch transition record is inconsistent with Data in the table. {}", Integer.valueOf(object.getNewEpoch()));
                    throw new IllegalStateException("Epoch transition record is inconsistent.");
                });
            }
            EpochTransitionRecord computeEpochTransition = RecordHelper.computeEpochTransition(epochRecord, (List) object.getSegmentsToSeal().stream().map(l -> {
                return Long.valueOf(StreamSegmentNameUtils.computeSegmentId(StreamSegmentNameUtils.getSegmentNumber(l.longValue()), epochRecord.getEpoch()));
            }).collect(Collectors.toList()), Lists.newArrayList(object.getNewSegmentsWithRange().values()), object.getTime());
            return updateEpochTransitionNode(new Data(computeEpochTransition.toBytes(), versionedMetadata.getVersion())).thenApply(version2 -> {
                return new VersionedMetadata(computeEpochTransition, version2);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedMetadata<EpochTransitionRecord>> getEpochTransition() {
        return getEpochTransitionNode().thenApply(data -> {
            return new VersionedMetadata(EpochTransitionRecord.fromBytes(data.getData()), data.getVersion());
        });
    }

    private CompletableFuture<Void> clearMarkers(Set<Long> set) {
        return Futures.toVoid(Futures.allOfWithResults((List) ((java.util.stream.Stream) set.stream().parallel()).map((v1) -> {
            return removeColdMarker(v1);
        }).collect(Collectors.toList())));
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> scaleOldSegmentsSealed(Map<Long, Long> map, VersionedMetadata<EpochTransitionRecord> versionedMetadata) {
        EpochTransitionRecord object = versionedMetadata.getObject();
        return Futures.toVoid(clearMarkers(object.getSegmentsToSeal()).thenCompose(r5 -> {
            return updateSealedSegmentSizes(map);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r52 -> {
            return updateCurrentEpochRecord(object.getNewEpoch());
        }));
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> completeScale(VersionedMetadata<EpochTransitionRecord> versionedMetadata) {
        Preconditions.checkNotNull(versionedMetadata);
        Preconditions.checkArgument(!versionedMetadata.getObject().equals(EpochTransitionRecord.EMPTY));
        return Futures.toVoid(updateEpochTransitionNode(new Data(EpochTransitionRecord.EMPTY.toBytes(), versionedMetadata.getVersion())));
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> startRollingTxn(int i, VersionedMetadata<CommittingTransactionsRecord> versionedMetadata) {
        CommittingTransactionsRecord object = versionedMetadata.getObject();
        if (object.isRollingTxnRecord()) {
            return CompletableFuture.completedFuture(versionedMetadata);
        }
        CommittingTransactionsRecord createRollingTxnRecord = object.createRollingTxnRecord(i);
        return updateCommittingTxnRecord(new Data(createRollingTxnRecord.toBytes(), versionedMetadata.getVersion())).thenApply(version -> {
            return new VersionedMetadata(createRollingTxnRecord, version);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> rollingTxnCreateDuplicateEpochs(Map<Long, Long> map, long j, VersionedMetadata<CommittingTransactionsRecord> versionedMetadata) {
        Preconditions.checkArgument(versionedMetadata.getObject().isRollingTxnRecord());
        CommittingTransactionsRecord object = versionedMetadata.getObject();
        return getActiveEpoch(true).thenCompose(epochRecord -> {
            return getEpochRecord(object.getEpoch()).thenCompose(epochRecord -> {
                if (epochRecord.getEpoch() > object.getCurrentEpoch()) {
                    log.debug("Duplicate Epochs {} already created. Ignore.", Integer.valueOf(object.getNewActiveEpoch()));
                    return CompletableFuture.completedFuture(null);
                }
                long max = Math.max(epochRecord.getCreationTime() + 1, j);
                List<StreamSegmentRecord> list = (List) epochRecord.getSegments().stream().map(streamSegmentRecord -> {
                    return newSegmentRecord(StreamSegmentNameUtils.computeSegmentId(StreamSegmentNameUtils.getSegmentNumber(streamSegmentRecord.segmentId()), object.getNewTxnEpoch()), max, Double.valueOf(streamSegmentRecord.getKeyStart()), Double.valueOf(streamSegmentRecord.getKeyEnd()));
                }).collect(Collectors.toList());
                List<StreamSegmentRecord> list2 = (List) epochRecord.getSegments().stream().map(streamSegmentRecord2 -> {
                    return newSegmentRecord(StreamSegmentNameUtils.computeSegmentId(StreamSegmentNameUtils.getSegmentNumber(streamSegmentRecord2.segmentId()), object.getNewActiveEpoch()), max + 1, Double.valueOf(streamSegmentRecord2.getKeyStart()), Double.valueOf(streamSegmentRecord2.getKeyEnd()));
                }).collect(Collectors.toList());
                EpochRecord m100build = EpochRecord.builder().epoch(object.getNewTxnEpoch()).referenceEpoch(epochRecord.getReferenceEpoch()).segments(list).creationTime(max).m100build();
                EpochRecord m100build2 = EpochRecord.builder().epoch(object.getNewActiveEpoch()).referenceEpoch(epochRecord.getReferenceEpoch()).segments(list2).creationTime(max + 1).m100build();
                HistoryTimeSeriesRecord m109build = HistoryTimeSeriesRecord.builder().epoch(m100build.getEpoch()).referenceEpoch(m100build.getReferenceEpoch()).creationTime(max).m109build();
                HistoryTimeSeriesRecord m109build2 = HistoryTimeSeriesRecord.builder().epoch(m100build2.getEpoch()).referenceEpoch(m100build2.getReferenceEpoch()).creationTime(max + 1).m109build();
                return createEpochRecord(m100build).thenCompose(r5 -> {
                    return updateHistoryTimeSeries(m109build);
                }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r52 -> {
                    return createEpochRecord(m100build2);
                }).thenCompose(r53 -> {
                    return updateHistoryTimeSeries(m109build2);
                }).thenCompose(r7 -> {
                    return Futures.allOf((Collection) epochRecord.getSegments().stream().map(streamSegmentRecord3 -> {
                        return recordSegmentSealedEpoch(streamSegmentRecord3.segmentId(), m100build.getEpoch());
                    }).collect(Collectors.toList()));
                }).thenCompose(r72 -> {
                    return Futures.allOf((Collection) m100build.getSegments().stream().map(streamSegmentRecord3 -> {
                        return recordSegmentSealedEpoch(streamSegmentRecord3.segmentId(), m100build2.getEpoch());
                    }).collect(Collectors.toList()));
                });
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r5 -> {
                return updateSealedSegmentSizes(map);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> completeRollingTxn(Map<Long, Long> map, VersionedMetadata<CommittingTransactionsRecord> versionedMetadata) {
        return getActiveEpoch(true).thenCompose(epochRecord -> {
            CommittingTransactionsRecord committingTransactionsRecord = (CommittingTransactionsRecord) versionedMetadata.getObject();
            return epochRecord.getEpoch() == committingTransactionsRecord.getCurrentEpoch() ? updateSealedSegmentSizes(map).thenCompose(r5 -> {
                return clearMarkers(map.keySet());
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r52 -> {
                return updateCurrentEpochRecord(committingTransactionsRecord.getNewActiveEpoch());
            }) : CompletableFuture.completedFuture(null);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<UUID> generateNewTxnId(int i, long j) {
        return getActiveEpochRecord(true).thenApply(epochRecord -> {
            return RecordHelper.generateTxnId(epochRecord.getReferenceEpoch(), i, j);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedTransactionData> createTransaction(UUID uuid, long j, long j2) {
        long currentTimeMillis = System.currentTimeMillis();
        long j3 = currentTimeMillis + j;
        long j4 = currentTimeMillis + j2;
        int transactionEpoch = RecordHelper.getTransactionEpoch(uuid);
        ActiveTxnRecord m91build = ActiveTxnRecord.builder().txnStatus(TxnStatus.OPEN).leaseExpiryTime(j3).txCreationTimestamp(currentTimeMillis).maxExecutionExpiryTime(j4).m91build();
        return verifyNotSealed().thenCompose(r16 -> {
            return createNewTransaction(transactionEpoch, uuid, m91build.toBytes()).thenApply(version -> {
                return new VersionedTransactionData(transactionEpoch, uuid, version, TxnStatus.OPEN, currentTimeMillis, j4);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedTransactionData> pingTransaction(VersionedTransactionData versionedTransactionData, long j) {
        int epoch = versionedTransactionData.getEpoch();
        UUID id = versionedTransactionData.getId();
        Version version = versionedTransactionData.getVersion();
        long creationTime = versionedTransactionData.getCreationTime();
        long maxExecutionExpiryTime = versionedTransactionData.getMaxExecutionExpiryTime();
        TxnStatus status = versionedTransactionData.getStatus();
        return updateActiveTx(epoch, id, new Data(new ActiveTxnRecord(creationTime, System.currentTimeMillis() + j, maxExecutionExpiryTime, status).toBytes(), version)).thenApply(version2 -> {
            return new VersionedTransactionData(epoch, id, version2, status, creationTime, maxExecutionExpiryTime);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedTransactionData> getTransactionData(UUID uuid) {
        int transactionEpoch = RecordHelper.getTransactionEpoch(uuid);
        return getActiveTx(transactionEpoch, uuid).thenApply(data -> {
            ActiveTxnRecord fromBytes = ActiveTxnRecord.fromBytes(data.getData());
            return new VersionedTransactionData(transactionEpoch, uuid, data.getVersion(), fromBytes.getTxnStatus(), fromBytes.getTxCreationTimestamp(), fromBytes.getMaxExecutionExpiryTime());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<TxnStatus> checkTransactionStatus(UUID uuid) {
        return getActiveTx(RecordHelper.getTransactionEpoch(uuid), uuid).handle((data, th) -> {
            if (th != null && (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException)) {
                return TxnStatus.UNKNOWN;
            }
            if (th != null) {
                throw new CompletionException(th);
            }
            return ActiveTxnRecord.fromBytes(data.getData()).getTxnStatus();
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) txnStatus -> {
            return txnStatus.equals(TxnStatus.UNKNOWN) ? getCompletedTxnStatus(uuid) : CompletableFuture.completedFuture(txnStatus);
        });
    }

    private CompletableFuture<TxnStatus> getCompletedTxnStatus(UUID uuid) {
        return getCompletedTx(uuid).handle((data, th) -> {
            if (th != null && (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException)) {
                return TxnStatus.UNKNOWN;
            }
            if (th != null) {
                throw new CompletionException(th);
            }
            return CompletedTxnRecord.fromBytes(data.getData()).getCompletionStatus();
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<AbstractMap.SimpleEntry<TxnStatus, Integer>> sealTransaction(UUID uuid, boolean z, Optional<Version> optional) {
        return sealActiveTxn(RecordHelper.getTransactionEpoch(uuid), uuid, z, optional).exceptionally(th -> {
            return new AbstractMap.SimpleEntry(handleDataNotFoundException(th), null);
        }).thenCompose(simpleEntry -> {
            return simpleEntry.getKey() == TxnStatus.UNKNOWN ? validateCompletedTxn(uuid, z, "seal").thenApply(txnStatus -> {
                return new AbstractMap.SimpleEntry(txnStatus, null);
            }) : CompletableFuture.completedFuture(simpleEntry);
        });
    }

    private CompletableFuture<AbstractMap.SimpleEntry<TxnStatus, Integer>> sealActiveTxn(int i, UUID uuid, boolean z, Optional<Version> optional) {
        return getActiveTx(i, uuid).thenCompose(data -> {
            ActiveTxnRecord fromBytes = ActiveTxnRecord.fromBytes(data.getData());
            data.getClass();
            Version version = (Version) optional.orElseGet(data::getVersion);
            TxnStatus txnStatus = fromBytes.getTxnStatus();
            switch (AnonymousClass1.$SwitchMap$io$pravega$controller$store$stream$TxnStatus[txnStatus.ordinal()]) {
                case ApiResponseMessage.ERROR /* 1 */:
                case ApiResponseMessage.WARNING /* 2 */:
                    if (z) {
                        throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + getName() + " Transaction: " + uuid.toString() + " State: " + txnStatus.name());
                    }
                    return CompletableFuture.completedFuture(new AbstractMap.SimpleEntry(txnStatus, Integer.valueOf(i)));
                case ApiResponseMessage.INFO /* 3 */:
                    return sealActiveTx(i, uuid, z, fromBytes, version).thenApply(version2 -> {
                        return new AbstractMap.SimpleEntry(z ? TxnStatus.COMMITTING : TxnStatus.ABORTING, Integer.valueOf(i));
                    });
                case ApiResponseMessage.OK /* 4 */:
                case ApiResponseMessage.TOO_BUSY /* 5 */:
                    if (z) {
                        return CompletableFuture.completedFuture(new AbstractMap.SimpleEntry(txnStatus, Integer.valueOf(i)));
                    }
                    throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + getName() + " Transaction: " + uuid.toString() + " State: " + txnStatus.name());
                default:
                    throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Stream: " + getName() + " Transaction: " + uuid.toString());
            }
        });
    }

    private CompletableFuture<Version> sealActiveTx(int i, UUID uuid, boolean z, ActiveTxnRecord activeTxnRecord, Version version) {
        return updateActiveTx(i, uuid, new Data(new ActiveTxnRecord(activeTxnRecord.getTxCreationTimestamp(), activeTxnRecord.getLeaseExpiryTime(), activeTxnRecord.getMaxExecutionExpiryTime(), z ? TxnStatus.COMMITTING : TxnStatus.ABORTING).toBytes(), version));
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<TxnStatus> commitTransaction(UUID uuid) {
        int transactionEpoch = RecordHelper.getTransactionEpoch(uuid);
        return checkTransactionStatus(uuid).thenApply(txnStatus -> {
            switch (AnonymousClass1.$SwitchMap$io$pravega$controller$store$stream$TxnStatus[txnStatus.ordinal()]) {
                case ApiResponseMessage.ERROR /* 1 */:
                case ApiResponseMessage.WARNING /* 2 */:
                case ApiResponseMessage.INFO /* 3 */:
                    throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + getName() + " Transaction: " + uuid.toString() + " State: " + txnStatus.toString());
                case ApiResponseMessage.OK /* 4 */:
                case ApiResponseMessage.TOO_BUSY /* 5 */:
                    return txnStatus;
                case 6:
                default:
                    throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Stream: " + getName() + " Transaction: " + uuid.toString());
            }
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) txnStatus2 -> {
            return txnStatus2.equals(TxnStatus.COMMITTING) ? createCompletedTxEntry(uuid, new CompletedTxnRecord(System.currentTimeMillis(), TxnStatus.COMMITTED).toBytes()) : CompletableFuture.completedFuture(null);
        }).thenCompose(r7 -> {
            return removeActiveTxEntry(transactionEpoch, uuid);
        }).thenApply(r2 -> {
            return TxnStatus.COMMITTED;
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<TxnStatus> abortTransaction(UUID uuid) {
        int transactionEpoch = RecordHelper.getTransactionEpoch(uuid);
        return checkTransactionStatus(uuid).thenApply(txnStatus -> {
            switch (AnonymousClass1.$SwitchMap$io$pravega$controller$store$stream$TxnStatus[txnStatus.ordinal()]) {
                case ApiResponseMessage.ERROR /* 1 */:
                case ApiResponseMessage.WARNING /* 2 */:
                    return txnStatus;
                case ApiResponseMessage.INFO /* 3 */:
                case ApiResponseMessage.OK /* 4 */:
                case ApiResponseMessage.TOO_BUSY /* 5 */:
                    throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + getName() + " Transaction: " + uuid.toString() + " State: " + txnStatus.name());
                case 6:
                default:
                    throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Stream: " + getName() + " Transaction: " + uuid.toString());
            }
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) txnStatus2 -> {
            return txnStatus2.equals(TxnStatus.ABORTING) ? createCompletedTxEntry(uuid, new CompletedTxnRecord(System.currentTimeMillis(), TxnStatus.ABORTED).toBytes()) : CompletableFuture.completedFuture(null);
        }).thenCompose(r7 -> {
            return removeActiveTxEntry(transactionEpoch, uuid);
        }).thenApply(r2 -> {
            return TxnStatus.ABORTED;
        });
    }

    private TxnStatus handleDataNotFoundException(Throwable th) {
        if (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException) {
            return TxnStatus.UNKNOWN;
        }
        throw th;
    }

    private CompletableFuture<TxnStatus> validateCompletedTxn(UUID uuid, boolean z, String str) {
        return getCompletedTxnStatus(uuid).thenApply(txnStatus -> {
            if ((z && txnStatus == TxnStatus.COMMITTED) || (!z && txnStatus == TxnStatus.ABORTED)) {
                return txnStatus;
            }
            if (txnStatus == TxnStatus.UNKNOWN) {
                throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Stream: " + getName() + " Transaction: " + uuid.toString());
            }
            throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + getName() + " Transaction: " + uuid.toString() + " State: " + txnStatus.name());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Map<UUID, ActiveTxnRecord>> getActiveTxns() {
        return getCurrentTxns().thenApply(map -> {
            return (Map) map.entrySet().stream().collect(Collectors.toMap(entry -> {
                return UUID.fromString((String) entry.getKey());
            }, entry2 -> {
                return ActiveTxnRecord.fromBytes(((Data) entry2.getValue()).getData());
            }));
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<EpochRecord> getActiveEpoch(boolean z) {
        return getCurrentEpochRecordData(z).thenApply(data -> {
            return EpochRecord.fromBytes(data.getData());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<EpochRecord> getEpochRecord(int i) {
        return getEpochRecordData(i).thenApply(data -> {
            return EpochRecord.fromBytes(data.getData());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> setColdMarker(long j, long j2) {
        return getMarkerData(j).thenCompose(data -> {
            if (data == null) {
                return createMarkerData(j, j2);
            }
            byte[] bArr = new byte[8];
            BitConverter.writeLong(bArr, 0, j2);
            return Futures.toVoid(updateMarkerData(j, new Data(bArr, data.getVersion())));
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Long> getColdMarker(long j) {
        return getMarkerData(j).thenApply(data -> {
            return Long.valueOf(data != null ? BitConverter.readLong(data.getData(), 0) : 0L);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> removeColdMarker(long j) {
        return removeMarkerData(j);
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Long> getSizeTillStreamCut(Map<Long, Long> map, Optional<StreamCutRecord> optional) {
        Map<Long, Long> map2 = (Map) optional.map((v0) -> {
            return v0.getStreamCut();
        }).orElse(Collections.emptyMap());
        return segmentsBetweenStreamCuts(map2, map).thenCompose(set -> {
            return sizeBetweenStreamCuts(map2, map, set).thenApply(l -> {
                return Long.valueOf(l.longValue() + ((Long) optional.map((v0) -> {
                    return v0.getRecordingSize();
                }).orElse(0L)).longValue());
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> addStreamCutToRetentionSet(StreamCutRecord streamCutRecord) {
        return getRetentionSetData().thenCompose(data -> {
            RetentionSet addReferenceToStreamCutIfLatest = RetentionSet.addReferenceToStreamCutIfLatest(RetentionSet.fromBytes(data.getData()), streamCutRecord);
            return createStreamCutRecordData(streamCutRecord.getRecordingTime(), streamCutRecord.toBytes()).thenCompose(r9 -> {
                return Futures.toVoid(updateRetentionSetData(new Data(addReferenceToStreamCutIfLatest.toBytes(), data.getVersion())));
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<RetentionSet> getRetentionSet() {
        return getRetentionSetData().thenApply(data -> {
            return RetentionSet.fromBytes(data.getData());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<StreamCutRecord> getStreamCutRecord(StreamCutReferenceRecord streamCutReferenceRecord) {
        return getStreamCutRecordData(streamCutReferenceRecord.getRecordingTime()).thenApply(data -> {
            return StreamCutRecord.fromBytes(data.getData());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> deleteStreamCutBefore(StreamCutReferenceRecord streamCutReferenceRecord) {
        return getRetentionSetData().thenCompose(data -> {
            RetentionSet fromBytes = RetentionSet.fromBytes(data.getData());
            RetentionSet removeStreamCutBefore = RetentionSet.removeStreamCutBefore(fromBytes, streamCutReferenceRecord);
            return Futures.allOf((Collection) fromBytes.retentionRecordsBefore(streamCutReferenceRecord).stream().map(streamCutReferenceRecord2 -> {
                return deleteStreamCutRecordData(streamCutReferenceRecord2.getRecordingTime());
            }).collect(Collectors.toList())).thenCompose(r9 -> {
                return Futures.toVoid(updateRetentionSetData(new Data(removeStreamCutBefore.toBytes(), data.getVersion())));
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> startCommittingTransactions(int i) {
        return getVersionedCommitTransactionsRecord().thenCompose(versionedMetadata -> {
            if (((CommittingTransactionsRecord) versionedMetadata.getObject()).equals(CommittingTransactionsRecord.EMPTY)) {
                return getTxnCommitList(i).thenCompose(list -> {
                    if (list.isEmpty()) {
                        return CompletableFuture.completedFuture(versionedMetadata);
                    }
                    CommittingTransactionsRecord m95build = CommittingTransactionsRecord.builder().epoch(i).transactionsToCommit(list).m95build();
                    return updateCommittingTxnRecord(new Data(m95build.toBytes(), versionedMetadata.getVersion())).thenApply(version -> {
                        return new VersionedMetadata(m95build, version);
                    });
                });
            }
            if (i != ((CommittingTransactionsRecord) versionedMetadata.getObject()).getEpoch()) {
                throw StoreException.create(StoreException.Type.OPERATION_NOT_ALLOWED, "Transactions on different epoch are being committed");
            }
            return CompletableFuture.completedFuture(versionedMetadata);
        });
    }

    private CompletableFuture<List<UUID>> getTxnCommitList(int i) {
        return getTransactionsInEpoch(i).thenApply(map -> {
            return (List) map.entrySet().stream().filter(entry -> {
                return ((ActiveTxnRecord) entry.getValue()).getTxnStatus().equals(TxnStatus.COMMITTING);
            }).map((v0) -> {
                return v0.getKey();
            }).collect(Collectors.toList());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedMetadata<CommittingTransactionsRecord>> getVersionedCommitTransactionsRecord() {
        return getCommitTxnRecord().thenApply(data -> {
            return new VersionedMetadata(CommittingTransactionsRecord.fromBytes(data.getData()), data.getVersion());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> completeCommittingTransactions(VersionedMetadata<CommittingTransactionsRecord> versionedMetadata) {
        CompletableFuture completedFuture = CompletableFuture.completedFuture(null);
        for (UUID uuid : versionedMetadata.getObject().getTransactionsToCommit()) {
            log.debug("Committing transaction {} on stream {}/{}", new Object[]{uuid, this.scope, this.name});
            completedFuture = completedFuture.thenCompose(r6 -> {
                return commitTransaction(uuid).thenAccept(txnStatus -> {
                    log.debug("transaction {} on stream {}/{} committed successfully", new Object[]{uuid, this.scope, this.name});
                });
            });
        }
        return completedFuture.thenCompose(r8 -> {
            return Futures.toVoid(updateCommittingTxnRecord(new Data(CommittingTransactionsRecord.EMPTY.toBytes(), versionedMetadata.getVersion())));
        });
    }

    private CompletableFuture<Map<UUID, ActiveTxnRecord>> getTransactionsInEpoch(int i) {
        return getTxnInEpoch(i).thenApply(map -> {
            return (Map) map.entrySet().stream().collect(Collectors.toMap(entry -> {
                return UUID.fromString((String) entry.getKey());
            }, entry2 -> {
                return ActiveTxnRecord.fromBytes(((Data) entry2.getValue()).getData());
            }));
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> createWaitingRequestIfAbsent(String str) {
        return createWaitingRequestNodeIfAbsent(str.getBytes());
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<String> getWaitingRequestProcessor() {
        return getWaitingRequestNode().handle((data, th) -> {
            if (th == null) {
                return new String(data.getData());
            }
            if (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException) {
                return null;
            }
            throw new CompletionException(th);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> deleteWaitingRequestConditionally(String str) {
        return getWaitingRequestProcessor().thenCompose(str2 -> {
            return (str2 == null || !str2.equals(str)) ? CompletableFuture.completedFuture(null) : deleteWaitingRequestNode();
        });
    }

    private CompletableFuture<Void> verifyLegalState() {
        return getState(false).thenApply(state -> {
            if (state == null || state.equals(State.UNKNOWN) || state.equals(State.CREATING)) {
                throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + getName() + " State: " + state.name());
            }
            return null;
        });
    }

    private CompletableFuture<Void> createEpochRecord(EpochRecord epochRecord) {
        return createEpochRecordDataIfAbsent(epochRecord.getEpoch(), epochRecord.toBytes());
    }

    private CompletableFuture<Void> updateCurrentEpochRecord(int i) {
        return getEpochRecord(i).thenCompose(epochRecord -> {
            return getCurrentEpochRecordData(true).thenCompose(data -> {
                return EpochRecord.fromBytes(data.getData()).getEpoch() < i ? Futures.toVoid(updateCurrentEpochRecordData(new Data(epochRecord.toBytes(), data.getVersion()))) : CompletableFuture.completedFuture(null);
            });
        });
    }

    private CompletableFuture<Void> createSealedSegmentSizeMapShardIfAbsent(int i) {
        return createSealedSegmentSizesMapShardDataIfAbsent(i, SealedSegmentsMapShard.builder().shardNumber(i).sealedSegmentsSizeMap(Collections.emptyMap()).m119build().toBytes());
    }

    @VisibleForTesting
    CompletableFuture<SealedSegmentsMapShard> getSealedSegmentSizeMapShard(int i) {
        return getSealedSegmentSizesMapShardData(i).handle((data, th) -> {
            if (th == null) {
                return SealedSegmentsMapShard.fromBytes(data.getData());
            }
            if (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException) {
                return SealedSegmentsMapShard.builder().shardNumber(i).sealedSegmentsSizeMap(Collections.emptyMap()).m119build();
            }
            throw new CompletionException(th);
        });
    }

    private CompletableFuture<Void> updateSealedSegmentSizes(Map<Long, Long> map) {
        return Futures.allOf((Collection) ((Map) map.keySet().stream().collect(Collectors.groupingBy((v1) -> {
            return getShardNumber(v1);
        }))).entrySet().stream().map(entry -> {
            int intValue = ((Integer) entry.getKey()).intValue();
            List list = (List) entry.getValue();
            return createSealedSegmentSizeMapShardIfAbsent(intValue).thenCompose(r10 -> {
                return getSealedSegmentSizesMapShardData(intValue).thenApply(data -> {
                    SealedSegmentsMapShard fromBytes = SealedSegmentsMapShard.fromBytes(data.getData());
                    list.forEach(l -> {
                        fromBytes.addSealedSegmentSize(l.longValue(), ((Long) map.get(l)).longValue());
                    });
                    return updateSealedSegmentSizesMapShardData(intValue, new Data(fromBytes.toBytes(), data.getVersion()));
                });
            });
        }).collect(Collectors.toList()));
    }

    private int getShardNumber(long j) {
        return StreamSegmentNameUtils.getEpoch(j) / this.shardSize.get();
    }

    private Map<StreamSegmentRecord, Integer> convertToSpan(EpochRecord epochRecord) {
        return (Map) epochRecord.getSegments().stream().collect(Collectors.toMap(streamSegmentRecord -> {
            return streamSegmentRecord;
        }, streamSegmentRecord2 -> {
            return Integer.valueOf(epochRecord.getEpoch());
        }));
    }

    private Segment transform(StreamSegmentRecord streamSegmentRecord) {
        return new Segment(streamSegmentRecord.segmentId(), streamSegmentRecord.getCreationTime(), streamSegmentRecord.getKeyStart(), streamSegmentRecord.getKeyEnd());
    }

    private List<Segment> transform(List<StreamSegmentRecord> list) {
        return (List) list.stream().map(this::transform).collect(Collectors.toList());
    }

    private Set<Segment> transform(Set<StreamSegmentRecord> set) {
        return (Set) set.stream().map(this::transform).collect(Collectors.toSet());
    }

    @VisibleForTesting
    CompletableFuture<List<EpochRecord>> fetchEpochs(int i, int i2, boolean z) {
        return getActiveEpochRecord(z).thenApply(epochRecord -> {
            return Integer.valueOf(epochRecord.getEpoch() / this.historyChunkSize.get());
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) num -> {
            return Futures.allOfWithResults((List) IntStream.range(i / this.historyChunkSize.get(), (i2 / this.historyChunkSize.get()) + 1).mapToObj(i3 -> {
                return getEpochsFromHistoryChunk(i3, i3 * this.historyChunkSize.get() > i ? i3 * this.historyChunkSize.get() : i, i2, i3 >= num.intValue());
            }).collect(Collectors.toList()));
        }).thenApply(list -> {
            return (List) list.stream().flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList());
        });
    }

    private CompletableFuture<List<EpochRecord>> getEpochsFromHistoryChunk(int i, int i2, int i3, boolean z) {
        return getEpochRecord(i2).thenCompose(epochRecord -> {
            return getHistoryTimeSeriesChunk(i, z).thenCompose(historyTimeSeries -> {
                ArrayList arrayList = new ArrayList();
                arrayList.add(CompletableFuture.completedFuture(epochRecord));
                return Futures.allOfWithResults((List) historyTimeSeries.getHistoryRecords().stream().filter(historyTimeSeriesRecord -> {
                    return historyTimeSeriesRecord.getEpoch() > i2 && historyTimeSeriesRecord.getEpoch() <= i3;
                }).reduce(arrayList, (list, historyTimeSeriesRecord2) -> {
                    CompletableFuture<EpochRecord> newEpochRecord = newEpochRecord((CompletableFuture) list.get(list.size() - 1), historyTimeSeriesRecord2.getEpoch(), historyTimeSeriesRecord2.getReferenceEpoch(), historyTimeSeriesRecord2.getSegmentsCreated(), (Collection) historyTimeSeriesRecord2.getSegmentsSealed().stream().map((v0) -> {
                        return v0.segmentId();
                    }).collect(Collectors.toList()), historyTimeSeriesRecord2.getScaleTime());
                    ArrayList arrayList2 = new ArrayList(list);
                    arrayList2.add(newEpochRecord);
                    return arrayList2;
                }, (list2, list3) -> {
                    ArrayList arrayList2 = new ArrayList(list2);
                    arrayList2.addAll(list3);
                    return arrayList2;
                }));
            });
        });
    }

    private CompletableFuture<EpochRecord> newEpochRecord(CompletableFuture<EpochRecord> completableFuture, int i, int i2, Collection<StreamSegmentRecord> collection, Collection<Long> collection2, long j) {
        return i == i2 ? completableFuture.thenApply(epochRecord -> {
            if (!$assertionsDisabled && epochRecord.getEpoch() != i - 1) {
                throw new AssertionError();
            }
            LinkedList linkedList = new LinkedList(epochRecord.getSegments());
            linkedList.removeIf(streamSegmentRecord -> {
                return collection2.contains(Long.valueOf(streamSegmentRecord.segmentId()));
            });
            linkedList.addAll(collection);
            return EpochRecord.builder().epoch(i).referenceEpoch(i2).segments(linkedList).creationTime(j).m100build();
        }) : getEpochRecord(i);
    }

    private StreamSegmentRecord newSegmentRecord(long j, long j2, Double d, Double d2) {
        return newSegmentRecord(StreamSegmentNameUtils.getEpoch(j), StreamSegmentNameUtils.getSegmentNumber(j), j2, d, d2);
    }

    private StreamSegmentRecord newSegmentRecord(int i, int i2, long j, Double d, Double d2) {
        return StreamSegmentRecord.builder().creationEpoch(i).segmentNumber(i2).creationTime(j).keyStart(d.doubleValue()).keyEnd(d2.doubleValue()).m140build();
    }

    @VisibleForTesting
    CompletableFuture<Integer> findEpochAtTime(long j, boolean z) {
        return getActiveEpoch(z).thenCompose(epochRecord -> {
            return searchEpochAtTime(0, epochRecord.getEpoch() / this.historyChunkSize.get(), num -> {
                return num.intValue() == epochRecord.getEpoch() / this.historyChunkSize.get();
            }, j).thenApply(num2 -> {
                if (num2.intValue() != -1) {
                    return num2;
                }
                if (j > epochRecord.getCreationTime()) {
                    return Integer.valueOf(epochRecord.getEpoch());
                }
                return 0;
            });
        });
    }

    private CompletableFuture<Integer> searchEpochAtTime(int i, int i2, Predicate<Integer> predicate, long j) {
        int i3 = (i + i2) / 2;
        return i > i2 ? CompletableFuture.completedFuture(-1) : getHistoryTimeSeriesChunk(i3, predicate.test(Integer.valueOf(i3))).thenCompose(historyTimeSeries -> {
            List<HistoryTimeSeriesRecord> historyRecords = historyTimeSeries.getHistoryRecords();
            long scaleTime = historyRecords.get(0).getScaleTime();
            long scaleTime2 = historyRecords.get(historyRecords.size() - 1).getScaleTime();
            if (j < scaleTime || j > scaleTime2) {
                return j < scaleTime ? searchEpochAtTime(i, i3 - 1, predicate, j) : searchEpochAtTime(i3 + 1, i2, predicate, j);
            }
            int findGreatestLowerBound = CollectionHelpers.findGreatestLowerBound(historyRecords, historyTimeSeriesRecord -> {
                return Integer.valueOf(Long.compare(j, historyTimeSeriesRecord.getScaleTime()));
            });
            if ($assertionsDisabled || findGreatestLowerBound >= 0) {
                return CompletableFuture.completedFuture(Integer.valueOf(historyRecords.get(findGreatestLowerBound).getEpoch()));
            }
            throw new AssertionError();
        });
    }

    private CompletableFuture<HistoryTimeSeries> getHistoryTimeSeriesChunk(int i, boolean z) {
        return getHistoryTimeSeriesChunkData(i, z).thenCompose(data -> {
            HistoryTimeSeries fromBytes = HistoryTimeSeries.fromBytes(data.getData());
            return (z || fromBytes.getHistoryRecords().size() >= this.historyChunkSize.get()) ? CompletableFuture.completedFuture(fromBytes) : getHistoryTimeSeriesChunk(i, true);
        });
    }

    abstract CompletableFuture<Void> checkScopeExists() throws StoreException;

    abstract CompletableFuture<CreateStreamResponse> checkStreamExists(StreamConfiguration streamConfiguration, long j, int i);

    abstract CompletableFuture<Void> storeCreationTimeIfAbsent(long j);

    abstract CompletableFuture<Void> deleteStream();

    abstract CompletableFuture<Void> createConfigurationIfAbsent(byte[] bArr);

    abstract CompletableFuture<Version> setConfigurationData(Data data);

    abstract CompletableFuture<Data> getConfigurationData(boolean z);

    abstract CompletableFuture<Void> createTruncationDataIfAbsent(byte[] bArr);

    abstract CompletableFuture<Version> setTruncationData(Data data);

    abstract CompletableFuture<Data> getTruncationData(boolean z);

    abstract CompletableFuture<Void> createStateIfAbsent(byte[] bArr);

    abstract CompletableFuture<Version> setStateData(Data data);

    abstract CompletableFuture<Data> getStateData(boolean z);

    abstract CompletableFuture<Void> createRetentionSetDataIfAbsent(byte[] bArr);

    abstract CompletableFuture<Void> createStreamCutRecordData(long j, byte[] bArr);

    abstract CompletableFuture<Data> getStreamCutRecordData(long j);

    abstract CompletableFuture<Void> deleteStreamCutRecordData(long j);

    abstract CompletableFuture<Version> updateRetentionSetData(Data data);

    abstract CompletableFuture<Data> getRetentionSetData();

    abstract CompletableFuture<Void> createHistoryTimeSeriesChunkDataIfAbsent(int i, byte[] bArr);

    abstract CompletableFuture<Data> getHistoryTimeSeriesChunkData(int i, boolean z);

    abstract CompletableFuture<Version> updateHistoryTimeSeriesChunkData(int i, Data data);

    abstract CompletableFuture<Void> createCurrentEpochRecordDataIfAbsent(byte[] bArr);

    abstract CompletableFuture<Version> updateCurrentEpochRecordData(Data data);

    abstract CompletableFuture<Data> getCurrentEpochRecordData(boolean z);

    abstract CompletableFuture<Void> createEpochRecordDataIfAbsent(int i, byte[] bArr);

    abstract CompletableFuture<Data> getEpochRecordData(int i);

    abstract CompletableFuture<Void> createSealedSegmentSizesMapShardDataIfAbsent(int i, byte[] bArr);

    abstract CompletableFuture<Data> getSealedSegmentSizesMapShardData(int i);

    abstract CompletableFuture<Version> updateSealedSegmentSizesMapShardData(int i, Data data);

    abstract CompletableFuture<Void> createSegmentSealedEpochRecordData(long j, int i);

    abstract CompletableFuture<Data> getSegmentSealedRecordData(long j);

    abstract CompletableFuture<Version> createNewTransaction(int i, UUID uuid, byte[] bArr);

    abstract CompletableFuture<Data> getActiveTx(int i, UUID uuid);

    abstract CompletableFuture<Version> updateActiveTx(int i, UUID uuid, Data data);

    abstract CompletableFuture<Data> getCompletedTx(UUID uuid);

    abstract CompletableFuture<Void> removeActiveTxEntry(int i, UUID uuid);

    abstract CompletableFuture<Void> createCompletedTxEntry(UUID uuid, byte[] bArr);

    abstract CompletableFuture<Map<String, Data>> getCurrentTxns();

    abstract CompletableFuture<Map<String, Data>> getTxnInEpoch(int i);

    abstract CompletableFuture<Void> createMarkerData(long j, long j2);

    abstract CompletableFuture<Version> updateMarkerData(long j, Data data);

    abstract CompletableFuture<Void> removeMarkerData(long j);

    abstract CompletableFuture<Data> getMarkerData(long j);

    abstract CompletableFuture<Void> createEpochTransitionIfAbsent(byte[] bArr);

    abstract CompletableFuture<Version> updateEpochTransitionNode(Data data);

    abstract CompletableFuture<Data> getEpochTransitionNode();

    abstract CompletableFuture<Void> createCommitTxnRecordIfAbsent(byte[] bArr);

    abstract CompletableFuture<Data> getCommitTxnRecord();

    abstract CompletableFuture<Version> updateCommittingTxnRecord(Data data);

    abstract CompletableFuture<Void> createWaitingRequestNodeIfAbsent(byte[] bArr);

    abstract CompletableFuture<Data> getWaitingRequestNode();

    abstract CompletableFuture<Void> deleteWaitingRequestNode();

    static {
        $assertionsDisabled = !PersistentStreamBase.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(PersistentStreamBase.class);
    }
}
