package io.pravega.controller.store.stream;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.BitConverter;
import io.pravega.controller.server.eventProcessor.requesthandlers.TaskExceptions;
import io.pravega.controller.server.rest.generated.api.ApiResponseMessage;
import io.pravega.controller.store.stream.EpochTransitionOperationExceptions;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.tables.ActiveTxnRecord;
import io.pravega.controller.store.stream.tables.CommittingTransactionsRecord;
import io.pravega.controller.store.stream.tables.CompletedTxnRecord;
import io.pravega.controller.store.stream.tables.Data;
import io.pravega.controller.store.stream.tables.EpochTransitionRecord;
import io.pravega.controller.store.stream.tables.HistoryIndexRecord;
import io.pravega.controller.store.stream.tables.HistoryRecord;
import io.pravega.controller.store.stream.tables.RetentionRecord;
import io.pravega.controller.store.stream.tables.SealedSegmentsRecord;
import io.pravega.controller.store.stream.tables.SegmentRecord;
import io.pravega.controller.store.stream.tables.State;
import io.pravega.controller.store.stream.tables.StateRecord;
import io.pravega.controller.store.stream.tables.StreamConfigurationRecord;
import io.pravega.controller.store.stream.tables.StreamCutRecord;
import io.pravega.controller.store.stream.tables.StreamTruncationRecord;
import io.pravega.controller.store.stream.tables.TableHelper;
import io.pravega.shared.segment.StreamSegmentNameUtils;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/store/stream/PersistentStreamBase.class */
public abstract class PersistentStreamBase<T> implements Stream {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log;
    private final String scope;
    private final String name;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* renamed from: io.pravega.controller.store.stream.PersistentStreamBase$1, reason: invalid class name */
    /* loaded from: input_file:io/pravega/controller/store/stream/PersistentStreamBase$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$pravega$controller$store$stream$TxnStatus = new int[TxnStatus.values().length];

        static {
            try {
                $SwitchMap$io$pravega$controller$store$stream$TxnStatus[TxnStatus.ABORTING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$TxnStatus[TxnStatus.ABORTED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$TxnStatus[TxnStatus.OPEN.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$TxnStatus[TxnStatus.COMMITTING.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$TxnStatus[TxnStatus.COMMITTED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$pravega$controller$store$stream$TxnStatus[TxnStatus.UNKNOWN.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PersistentStreamBase(String str, String str2) {
        this.scope = str;
        this.name = str2;
    }

    @Override // io.pravega.controller.store.stream.Stream
    public String getScope() {
        return this.scope;
    }

    @Override // io.pravega.controller.store.stream.Stream
    public String getName() {
        return this.name;
    }

    @Override // io.pravega.controller.store.stream.Stream
    public String getScopeName() {
        return this.scope;
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<CreateStreamResponse> create(StreamConfiguration streamConfiguration, long j) {
        return checkScopeExists().thenCompose(r9 -> {
            return checkStreamExists(streamConfiguration, j);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) createStreamResponse -> {
            return storeCreationTimeIfAbsent(createStreamResponse.getTimestamp()).thenCompose(r5 -> {
                return createConfigurationIfAbsent(StreamConfigurationRecord.complete(createStreamResponse.getConfiguration()));
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r4 -> {
                return createTruncationDataIfAbsent(StreamTruncationRecord.EMPTY);
            }).thenCompose(r42 -> {
                return createStateIfAbsent(State.CREATING);
            }).thenCompose(r7 -> {
                return createNewSegmentTableWithIndex(createStreamResponse.getConfiguration(), createStreamResponse.getTimestamp());
            }).thenCompose(r72 -> {
                return createHistoryIndexIfAbsent(new Data<>(TableHelper.createHistoryIndex(), null));
            }).thenCompose(r8 -> {
                return createHistoryTableIfAbsent(new Data<>(TableHelper.createHistoryTable(createStreamResponse.getTimestamp(), (List) IntStream.range(0, createStreamResponse.getConfiguration().getScalingPolicy().getMinNumSegments()).boxed().map(num -> {
                    return Long.valueOf(StreamSegmentNameUtils.computeSegmentId(num.intValue(), 0));
                }).collect(Collectors.toList())), null));
            }).thenCompose(r6 -> {
                return createSealedSegmentsRecord(new SealedSegmentsRecord(Collections.emptyMap()).toByteArray());
            }).thenCompose(r62 -> {
                return createRetentionSet(new RetentionRecord(Collections.emptyList()).toByteArray());
            }).thenApply(r3 -> {
                return createStreamResponse;
            });
        });
    }

    private CompletableFuture<Void> createNewSegmentTableWithIndex(StreamConfiguration streamConfiguration, long j) {
        int minNumSegments = streamConfiguration.getScalingPolicy().getMinNumSegments();
        double d = 1.0d / minNumSegments;
        Pair<byte[], byte[]> createSegmentTableAndIndex = TableHelper.createSegmentTableAndIndex((List) IntStream.range(0, minNumSegments).boxed().map(num -> {
            return new AbstractMap.SimpleEntry(Double.valueOf(num.intValue() * d), Double.valueOf((num.intValue() + 1) * d));
        }).collect(Collectors.toList()), j);
        return createSegmentIndexIfAbsent(new Data<>((byte[]) createSegmentTableAndIndex.getKey(), null)).thenCompose(r8 -> {
            return createSegmentTableIfAbsent(new Data<>((byte[]) createSegmentTableAndIndex.getValue(), null));
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> delete() {
        return deleteStream();
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> startTruncation(Map<Long, Long> map) {
        return Futures.allOfWithResults((List) map.keySet().stream().map(l -> {
            return getSegment(l.longValue()).thenApply(segment -> {
                return new AbstractMap.SimpleEntry(Double.valueOf(segment.getKeyStart()), Double.valueOf(segment.getKeyEnd()));
            });
        }).collect(Collectors.toList())).thenAccept((Consumer) TableHelper::validateStreamCut).thenCompose(r6 -> {
            return getTruncationData(true).thenCompose(data -> {
                Preconditions.checkNotNull(data);
                StreamTruncationRecord parse = StreamTruncationRecord.parse(data.getData());
                Exceptions.checkArgument(!parse.isUpdating(), "TruncationRecord", "Truncation record conflict", new Object[0]);
                return computeTruncationRecord(parse, map).thenCompose(streamTruncationRecord -> {
                    return setTruncationData(new Data<>(streamTruncationRecord.toByteArray(), data.getVersion()));
                });
            });
        });
    }

    private CompletableFuture<StreamTruncationRecord> computeTruncationRecord(StreamTruncationRecord streamTruncationRecord, Map<Long, Long> map) {
        log.debug("computing truncation for stream {}/{}", this.scope, this.name);
        return getHistoryIndexFromStore().thenCompose(data -> {
            return getHistoryTableFromStore().thenCompose(data -> {
                return getSegmentIndexFromStore().thenCompose(data -> {
                    return getSegmentTableFromStore().thenApply(data -> {
                        return TableHelper.computeTruncationRecord(data.getData(), data.getData(), data.getData(), data.getData(), map, streamTruncationRecord);
                    });
                });
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> completeTruncation() {
        return checkState(state -> {
            return state.equals(State.TRUNCATING);
        }).thenCompose(r4 -> {
            return getTruncationData(true).thenCompose(data -> {
                Preconditions.checkNotNull(data);
                StreamTruncationRecord parse = StreamTruncationRecord.parse(data.getData());
                return parse.isUpdating() ? setTruncationData(new Data<>(StreamTruncationRecord.complete(parse).toByteArray(), data.getVersion())) : CompletableFuture.completedFuture(null);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<StreamTruncationRecord> getTruncationRecord(boolean z) {
        return getTruncationData(z).thenApply(data -> {
            return data == null ? StreamTruncationRecord.EMPTY : StreamTruncationRecord.parse(data.getData());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> startUpdateConfiguration(StreamConfiguration streamConfiguration) {
        return getConfigurationData(true).thenCompose(data -> {
            StreamConfigurationRecord parse = StreamConfigurationRecord.parse(data.getData());
            Preconditions.checkNotNull(parse);
            Preconditions.checkArgument(!parse.isUpdating());
            return setConfigurationData(new Data<>(StreamConfigurationRecord.update(streamConfiguration).toByteArray(), data.getVersion()));
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> completeUpdateConfiguration() {
        return checkState(state -> {
            return state.equals(State.UPDATING);
        }).thenCompose(r4 -> {
            return getConfigurationData(true).thenCompose(data -> {
                StreamConfigurationRecord parse = StreamConfigurationRecord.parse(data.getData());
                Preconditions.checkNotNull(parse);
                if (!parse.isUpdating()) {
                    return CompletableFuture.completedFuture(null);
                }
                StreamConfigurationRecord complete = StreamConfigurationRecord.complete(parse.getStreamConfiguration());
                log.debug("Completing update configuration for stream {}/{}", this.scope, this.name);
                return setConfigurationData(new Data<>(complete.toByteArray(), data.getVersion()));
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<StreamConfiguration> getConfiguration() {
        return getConfigurationRecord(false).thenApply((v0) -> {
            return v0.getStreamConfiguration();
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<StreamConfigurationRecord> getConfigurationRecord(boolean z) {
        return getConfigurationData(z).thenApply(data -> {
            return StreamConfigurationRecord.parse(data.getData());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Boolean> updateState(State state) {
        return getStateData(true).thenCompose(data -> {
            return State.isTransitionAllowed(StateRecord.parse(data.getData()).getState(), state) ? setStateData(new Data<>(StateRecord.builder().state(state).m102build().toByteArray(), data.getVersion())).thenApply(r2 -> {
                return true;
            }) : Futures.failedFuture(StoreException.create(StoreException.Type.OPERATION_NOT_ALLOWED, "Stream: " + getName() + " State: " + state.name() + " current state = " + StateRecord.parse(data.getData()).getState()));
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<State> getState(boolean z) {
        return getStateData(z).thenApply(data -> {
            return StateRecord.parse(data.getData()).getState();
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Segment> getSegment(long j) {
        return verifyLegalState().thenCompose(r7 -> {
            return getSegmentRow(j);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<List<ScaleMetadata>> getScaleMetadata() {
        return verifyLegalState().thenCompose(r3 -> {
            return getHistoryIndex();
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) data -> {
            return getHistoryTable().thenApply(data -> {
                return TableHelper.getScaleMetadata(data.getData(), data.getData());
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) list -> {
                return Futures.allOfWithResults((List) list.stream().map(pair -> {
                    long longValue = ((Long) pair.getLeft()).longValue();
                    return Futures.allOfWithResults((List) ((List) pair.getRight()).stream().map((v1) -> {
                        return getSegment(v1);
                    }).collect(Collectors.toList())).thenApply(list -> {
                        return new ImmutablePair(Long.valueOf(longValue), list);
                    });
                }).collect(Collectors.toList()));
            }).thenApply(this::mapToScaleMetadata);
        });
    }

    private List<ScaleMetadata> mapToScaleMetadata(List<ImmutablePair<Long, List<Segment>>> list) {
        AtomicReference atomicReference = new AtomicReference();
        return (List) list.stream().map(immutablePair -> {
            long j = 0;
            long j2 = 0;
            if (atomicReference.get() != null) {
                j = findSegmentSplitsMerges((List) atomicReference.get(), (List) immutablePair.right);
                j2 = findSegmentSplitsMerges((List) immutablePair.right, (List) atomicReference.get());
            }
            atomicReference.set(immutablePair.getRight());
            return new ScaleMetadata(((Long) immutablePair.left).longValue(), (List) immutablePair.right, j, j2);
        }).collect(Collectors.toList());
    }

    private long findSegmentSplitsMerges(List<Segment> list, List<Segment> list2) {
        return list.stream().filter(segment -> {
            return list2.stream().filter(segment -> {
                return segment.overlaps(segment);
            }).count() > 1;
        }).count();
    }

    private CompletableFuture<List<Segment>> findOverlapping(Segment segment, List<Long> list) {
        return verifyLegalState().thenCompose(r5 -> {
            return Futures.allOfWithResults((List) list.stream().map((v1) -> {
                return getSegment(v1);
            }).collect(Collectors.toList()));
        }).thenApply((Function<? super U, ? extends U>) list2 -> {
            return (List) list2.stream().filter(segment2 -> {
                return segment2.overlaps(segment);
            }).collect(Collectors.toList());
        });
    }

    private CompletableFuture<List<Segment>> getSuccessorsForSegment(long j) {
        return getHistoryIndex().thenCompose(data -> {
            return getHistoryTable().thenCompose(data -> {
                return getSegment(j).thenCompose(segment -> {
                    return findOverlapping(segment, TableHelper.findSegmentSuccessorCandidates(segment, data.getData(), data.getData()));
                });
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Map<Long, List<Long>>> getSuccessorsWithPredecessors(long j) {
        return verifyLegalState().thenCompose(r8 -> {
            return getHistoryIndex().thenCompose(data -> {
                return getHistoryTable().thenCompose(data -> {
                    return getSuccessorsForSegment(j).thenCompose(list -> {
                        ArrayList arrayList = new ArrayList();
                        Iterator it = list.iterator();
                        while (it.hasNext()) {
                            Segment segment = (Segment) it.next();
                            arrayList.add(findOverlapping(segment, TableHelper.findSegmentPredecessorCandidates(segment, data.getData(), data.getData())).thenApply(list -> {
                                return new AbstractMap.SimpleImmutableEntry(segment, list.stream().map((v0) -> {
                                    return v0.segmentId();
                                }).collect(Collectors.toList()));
                            }));
                        }
                        return Futures.allOfWithResults(arrayList);
                    }).thenApply((Function<? super U, ? extends U>) list2 -> {
                        return (Map) list2.stream().collect(Collectors.toMap(entry -> {
                            return Long.valueOf(((Segment) entry.getKey()).segmentId());
                        }, (v0) -> {
                            return v0.getValue();
                        }));
                    });
                });
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<List<Long>> getActiveSegments() {
        return verifyLegalState().thenCompose(r4 -> {
            return getHistoryIndex().thenCompose(data -> {
                return getHistoryTable().thenApply(data -> {
                    return TableHelper.getActiveSegments(data.getData(), data.getData());
                });
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<List<Long>> getActiveSegments(long j) {
        return getTruncationRecord(false).thenCompose(streamTruncationRecord -> {
            return getHistoryIndex().thenCompose(data -> {
                return getHistoryTable().thenCompose(data -> {
                    return getSegmentIndex().thenCompose(data -> {
                        return getSegmentTable().thenApply(data -> {
                            return TableHelper.getActiveSegments(j, data.getData(), data.getData(), data.getData(), data.getData(), streamTruncationRecord);
                        });
                    });
                });
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<List<Long>> getActiveSegments(int i) {
        return getHistoryIndex().thenCompose(data -> {
            return getHistoryTable().thenApply(data -> {
                return TableHelper.getSegmentsInEpoch(data.getData(), data.getData(), i);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<List<Segment>> getSegmentsBetweenStreamCuts(Map<Long, Long> map, Map<Long, Long> map2) {
        return getHistoryIndex().thenCompose(data -> {
            return getHistoryTable().thenCompose(data -> {
                return getSegmentIndex().thenCompose(data -> {
                    return getSegmentTable().thenApply(data -> {
                        return TableHelper.findSegmentsBetweenStreamCuts(data.getData(), data.getData(), data.getData(), data.getData(), map, map2);
                    });
                });
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<EpochTransitionRecord> startScale(List<Long> list, List<AbstractMap.SimpleEntry<Double, Double>> list2, long j, boolean z) {
        return verifyNotSealed().thenCompose(r14 -> {
            return getHistoryIndexFromStore().thenCompose(data -> {
                return getHistoryTableFromStore().thenCompose(data -> {
                    return getSegmentIndexFromStore().thenCompose(data -> {
                        return getSegmentTableFromStore().thenCompose(data -> {
                            if (TableHelper.isScaleInputValid(list, list2, data.getData(), data.getData())) {
                                return startScale(list, list2, j, z, data, data, data, data);
                            }
                            log.error("scale input invalid {} {}", list, list2);
                            throw new EpochTransitionOperationExceptions.InputInvalidException();
                        });
                    });
                });
            });
        });
    }

    private CompletableFuture<EpochTransitionRecord> startScale(List<Long> list, List<AbstractMap.SimpleEntry<Double, Double>> list2, long j, boolean z, Data<T> data, Data<T> data2, Data<T> data3, Data<T> data4) {
        return getEpochTransition().thenCompose(epochTransitionRecord -> {
            if (epochTransitionRecord == null) {
                return resetStateConditionally(State.SCALING).thenCompose(r19 -> {
                    if (z) {
                        log.info("scale not started, retry later.");
                        throw new TaskExceptions.StartException("Scale not started yet.");
                    }
                    if (TableHelper.canScaleFor(list, data.getData(), data2.getData())) {
                        EpochTransitionRecord computeEpochTransition = TableHelper.computeEpochTransition(data.getData(), data2.getData(), data3.getData(), data4.getData(), list, list2, j);
                        return createEpochTransitionNode(computeEpochTransition.toByteArray()).handle((r9, th) -> {
                            if (Exceptions.unwrap(th) instanceof StoreException.DataExistsException) {
                                log.debug("scale conflict, another scale operation is ongoing");
                                throw new EpochTransitionOperationExceptions.ConflictException();
                            }
                            log.info("scale for stream {}/{} accepted. Segments to seal = {}", new Object[]{this.scope, this.name, computeEpochTransition.getSegmentsToSeal()});
                            return computeEpochTransition;
                        });
                    }
                    log.warn("scale precondition failed {}", list);
                    throw new EpochTransitionOperationExceptions.PreConditionFailureException();
                });
            }
            if (verifyRecordMatchesInput(list, list2, z, epochTransitionRecord)) {
                return CompletableFuture.completedFuture(epochTransitionRecord);
            }
            log.debug("scale conflict, another scale operation is ongoing");
            throw new EpochTransitionOperationExceptions.ConflictException();
        });
    }

    private boolean verifyRecordMatchesInput(List<Long> list, List<AbstractMap.SimpleEntry<Double, Double>> list2, boolean z, EpochTransitionRecord epochTransitionRecord) {
        boolean allMatch = list2.stream().allMatch(simpleEntry -> {
            return epochTransitionRecord.getNewSegmentsWithRange().values().stream().anyMatch(simpleEntry -> {
                return ((Double) simpleEntry.getKey()).equals(simpleEntry.getKey()) && ((Double) simpleEntry.getValue()).equals(simpleEntry.getValue());
            });
        });
        java.util.stream.Stream stream = epochTransitionRecord.getSegmentsToSeal().stream();
        list.getClass();
        return allMatch && (stream.allMatch((v1) -> {
            return r1.contains(v1);
        }) || (z && ((Set) epochTransitionRecord.getSegmentsToSeal().stream().map((v0) -> {
            return StreamSegmentNameUtils.getSegmentNumber(v0);
        }).collect(Collectors.toSet())).equals(list.stream().map((v0) -> {
            return StreamSegmentNameUtils.getSegmentNumber(v0);
        }).collect(Collectors.toSet()))));
    }

    private CompletableFuture<Void> verifyNotSealed() {
        return getState(false).thenApply(state -> {
            if (state.equals(State.SEALING) || state.equals(State.SEALED)) {
                throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + getName() + " State: " + state.name());
            }
            return null;
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> scaleCreateNewSegments(boolean z) {
        return checkState(state -> {
            return state.equals(State.SCALING);
        }).thenCompose(r6 -> {
            return getHistoryIndexFromStore().thenCompose(data -> {
                return getHistoryTableFromStore().thenCompose(data -> {
                    return getSegmentIndexFromStore().thenCompose(data -> {
                        return getSegmentTableFromStore().thenCompose(data -> {
                            return getEpochTransition().thenCompose(epochTransitionRecord -> {
                                return z ? migrateManualScaleToNewEpoch(epochTransitionRecord, data, data, data, data) : CompletableFuture.completedFuture(epochTransitionRecord);
                            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) epochTransitionRecord2 -> {
                                int newEpoch = epochTransitionRecord2.getNewEpoch();
                                SegmentRecord latestSegmentRecord = TableHelper.getLatestSegmentRecord(data.getData(), data.getData());
                                HistoryRecord activeEpoch = TableHelper.getActiveEpoch(data.getData(), data.getData());
                                if (latestSegmentRecord.getCreationEpoch() >= newEpoch || activeEpoch.getEpoch() != epochTransitionRecord2.getActiveEpoch()) {
                                    return discardInconsistentEpochTransition(data, data, data, data, epochTransitionRecord2);
                                }
                                log.info("Scale {}/{} for segments started. Creating new segments. SegmentsToSeal {}", new Object[]{this.scope, this.name, epochTransitionRecord2.getSegmentsToSeal()});
                                return createNewSegments(data.getData(), data.getData(), data, data, epochTransitionRecord2);
                            });
                        });
                    });
                });
            });
        });
    }

    private CompletableFuture<Void> discardInconsistentEpochTransition(Data<T> data, Data<T> data2, Data<T> data3, Data<T> data4, EpochTransitionRecord epochTransitionRecord) {
        if (!TableHelper.isEpochTransitionConsistent(epochTransitionRecord, data.getData(), data2.getData(), data3.getData(), data4.getData())) {
            return deleteEpochTransitionNode().thenCompose(r4 -> {
                return resetStateConditionally(State.SCALING);
            }).thenAccept((Consumer<? super U>) r5 -> {
                log.warn("Scale epoch transition record is inconsistent with data in the table. {}", Integer.valueOf(epochTransitionRecord.getNewEpoch()));
                throw new IllegalStateException("Epoch transition record is inconsistent.");
            });
        }
        log.debug("CreateNewSegments step for stream {}/{} is idempotent, segments are already present in segment table.", this.scope, this.name);
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> createNewSegments(byte[] bArr, byte[] bArr2, Data<T> data, Data<T> data2, EpochTransitionRecord epochTransitionRecord) {
        long max = Math.max(Math.max(System.currentTimeMillis(), epochTransitionRecord.getTime()), TableHelper.getEpochScaleTime(epochTransitionRecord.getActiveEpoch(), bArr, bArr2) + 1);
        if (!$assertionsDisabled && epochTransitionRecord.getNewSegmentsWithRange().isEmpty()) {
            throw new AssertionError();
        }
        Pair<byte[], byte[]> addNewSegmentsToSegmentTableAndIndex = TableHelper.addNewSegmentsToSegmentTableAndIndex(epochTransitionRecord.getNewSegmentsWithRange().keySet().stream().mapToInt((v0) -> {
            return StreamSegmentNameUtils.getSegmentNumber(v0);
        }).min().getAsInt(), epochTransitionRecord.getNewEpoch(), data.getData(), data2.getData(), (List) epochTransitionRecord.getNewSegmentsWithRange().entrySet().stream().sorted(Comparator.comparingLong((v0) -> {
            return v0.getKey();
        })).map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toList()), max);
        Data<T> data3 = new Data<>((byte[]) addNewSegmentsToSegmentTableAndIndex.getKey(), data.getVersion());
        Data data4 = new Data((byte[]) addNewSegmentsToSegmentTableAndIndex.getValue(), data2.getVersion());
        return updateSegmentIndex(data3).thenCompose(r5 -> {
            return updateSegmentTable(data4);
        }).thenAccept((Consumer<? super U>) r6 -> {
            log.info("scale {}/{} new segments created successfully", this.scope, this.name);
        });
    }

    private CompletionStage<EpochTransitionRecord> migrateManualScaleToNewEpoch(EpochTransitionRecord epochTransitionRecord, Data<T> data, Data<T> data2, Data<T> data3, Data<T> data4) {
        HistoryRecord activeEpoch = TableHelper.getActiveEpoch(data.getData(), data2.getData());
        HistoryRecord epochRecord = TableHelper.getEpochRecord(data.getData(), data2.getData(), epochTransitionRecord.getActiveEpoch());
        if (epochTransitionRecord.getActiveEpoch() == activeEpoch.getEpoch()) {
            return CompletableFuture.completedFuture(epochTransitionRecord);
        }
        if (activeEpoch.getEpoch() <= epochTransitionRecord.getActiveEpoch() || activeEpoch.getReferenceEpoch() != epochRecord.getReferenceEpoch()) {
            return deleteEpochTransitionNode().thenCompose(r4 -> {
                return resetStateConditionally(State.SCALING);
            }).thenApply((Function<? super U, ? extends U>) r5 -> {
                log.warn("Scale epoch transition record is inconsistent with data in the table. {}", Integer.valueOf(epochTransitionRecord.getNewEpoch()));
                throw new IllegalStateException("Epoch transition record is inconsistent.");
            });
        }
        EpochTransitionRecord computeEpochTransition = TableHelper.computeEpochTransition(data.getData(), data2.getData(), data3.getData(), data4.getData(), (List) epochTransitionRecord.getSegmentsToSeal().stream().map(l -> {
            return Long.valueOf(StreamSegmentNameUtils.computeSegmentId(StreamSegmentNameUtils.getSegmentNumber(l.longValue()), activeEpoch.getEpoch()));
        }).collect(Collectors.toList()), epochTransitionRecord.getNewSegmentsWithRange().values().asList(), epochTransitionRecord.getTime());
        return updateEpochTransitionNode(computeEpochTransition.toByteArray()).thenApply(r3 -> {
            return computeEpochTransition;
        });
    }

    private CompletableFuture<EpochTransitionRecord> getEpochTransition() {
        return getEpochTransitionNode().handle((data, th) -> {
            if (th == null) {
                return EpochTransitionRecord.parse(data.getData());
            }
            Throwable unwrap = Exceptions.unwrap(th);
            if (unwrap instanceof StoreException.DataNotFoundException) {
                return null;
            }
            throw new CompletionException(unwrap);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> scaleNewSegmentsCreated() {
        return checkState(state -> {
            return state.equals(State.SCALING);
        }).thenCompose(r4 -> {
            return getEpochTransition().thenCompose(this::addPartialHistoryRecordAndIndex);
        });
    }

    private CompletableFuture<Void> addPartialHistoryRecordAndIndex(EpochTransitionRecord epochTransitionRecord) {
        ImmutableSet<Long> segmentsToSeal = epochTransitionRecord.getSegmentsToSeal();
        ImmutableSet keySet = epochTransitionRecord.getNewSegmentsWithRange().keySet();
        int activeEpoch = epochTransitionRecord.getActiveEpoch();
        int newEpoch = epochTransitionRecord.getNewEpoch();
        return getHistoryIndexFromStore().thenCompose(data -> {
            return getHistoryTableFromStore().thenCompose(data -> {
                HistoryRecord historyRecord = HistoryRecord.readLatestRecord(data.getData(), data.getData(), false).get();
                if (historyRecord.getEpoch() <= activeEpoch) {
                    List<Long> newActiveSegments = getNewActiveSegments(keySet, segmentsToSeal, historyRecord);
                    int length = data.getData().length;
                    Data data = new Data(TableHelper.addPartialRecordToHistoryTable(data.getData(), data.getData(), newActiveSegments), data.getVersion());
                    return addHistoryIndexRecord(newEpoch, length).thenCompose(r5 -> {
                        return updateHistoryTable(data);
                    }).whenComplete((BiConsumer<? super U, ? super Throwable>) (r9, th) -> {
                        if (th == null) {
                            log.debug("{}/{} scale op for epoch {}. Creating new epoch and updating history table.", new Object[]{this.scope, this.name, Integer.valueOf(activeEpoch)});
                        } else {
                            log.warn("{}/{} scale op for epoch {}. Failed to add partial record to history table. {}", new Object[]{this.scope, this.name, Integer.valueOf(activeEpoch), th.getClass().getName()});
                        }
                    });
                }
                boolean z = historyRecord.isPartial() && historyRecord.getSegments().containsAll(keySet);
                if (z) {
                    java.util.stream.Stream<Long> stream = HistoryRecord.fetchPrevious(historyRecord, data.getData(), data.getData()).get().getSegments().stream();
                    keySet.getClass();
                    z = stream.noneMatch((v1) -> {
                        return r1.contains(v1);
                    });
                }
                if (z) {
                    log.debug("{}/{} scale op for epoch {} - history record already added", new Object[]{this.scope, this.name, Integer.valueOf(activeEpoch)});
                    return CompletableFuture.completedFuture(null);
                }
                log.warn("{}/{} scale op for epoch {}. Scale already completed.", new Object[]{this.scope, this.name, Integer.valueOf(activeEpoch)});
                throw new EpochTransitionOperationExceptions.InputInvalidException();
            });
        });
    }

    private CompletableFuture<Void> clearMarkers(Set<Long> set) {
        return Futures.toVoid(Futures.allOfWithResults((List) ((java.util.stream.Stream) set.stream().parallel()).map((v1) -> {
            return removeColdMarker(v1);
        }).collect(Collectors.toList())));
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> scaleOldSegmentsSealed(Map<Long, Long> map) {
        return checkState(state -> {
            return state.equals(State.SCALING);
        }).thenCompose(r6 -> {
            return getEpochTransition().thenCompose(epochTransitionRecord -> {
                return Futures.toVoid(clearMarkers(epochTransitionRecord.getSegmentsToSeal()).thenCompose(r11 -> {
                    ImmutableSet keySet = epochTransitionRecord.getNewSegmentsWithRange().keySet();
                    return completePartialRecordInHistory(map, epochTransitionRecord.getActiveEpoch(), epochTransitionRecord.getTime(), historyRecord -> {
                        java.util.stream.Stream<Long> stream = historyRecord.getSegments().stream();
                        map.getClass();
                        return stream.noneMatch((v1) -> {
                            return r1.containsKey(v1);
                        }) && keySet.stream().allMatch(l -> {
                            return historyRecord.getSegments().contains(l);
                        });
                    }).thenCompose(r3 -> {
                        return deleteEpochTransitionNode();
                    });
                }));
            });
        });
    }

    private CompletableFuture<Void> completePartialRecordInHistory(Map<Long, Long> map, int i, long j, Predicate<HistoryRecord> predicate) {
        return getHistoryIndexFromStore().thenCompose(data -> {
            return getHistoryTableFromStore().thenCompose(data -> {
                Optional<HistoryRecord> readLatestRecord = HistoryRecord.readLatestRecord(data.getData(), data.getData(), false);
                if (!$assertionsDisabled && !readLatestRecord.isPresent()) {
                    throw new AssertionError();
                }
                HistoryRecord historyRecord = readLatestRecord.get();
                if (historyRecord.isPartial()) {
                    Data data = new Data(TableHelper.completePartialRecordInHistoryTable(data.getData(), data.getData(), historyRecord, Math.max(Math.max(System.currentTimeMillis(), j), HistoryRecord.fetchPrevious(historyRecord, data.getData(), data.getData()).get().getScaleTime() + 1)), data.getVersion());
                    return addSealedSegmentsToRecord(map).thenCompose(r5 -> {
                        return updateHistoryTable(data);
                    }).whenComplete((BiConsumer<? super U, ? super Throwable>) (r9, th) -> {
                        if (th != null) {
                            log.warn("{}/{} attempt to complete epoch transition for epoch {}. {}", new Object[]{this.scope, this.name, Integer.valueOf(i), th.toString()});
                        } else {
                            log.debug("{}/{} epoch transition complete, index and history tables updated for epoch {}.", new Object[]{this.scope, this.name, Integer.valueOf(i)});
                        }
                    });
                }
                if (predicate.test(historyRecord)) {
                    log.debug("{}/{} epoch transition already completed for epoch {}.", new Object[]{this.scope, this.name, Integer.valueOf(i)});
                    return CompletableFuture.completedFuture(null);
                }
                log.debug("{}/{} epoch transition completion attempt invalid for epoch {}.", new Object[]{this.scope, this.name, Integer.valueOf(i)});
                throw new EpochTransitionOperationExceptions.ConditionInvalidException();
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> rollingTxnNewSegmentsCreated(Map<Long, Long> map, int i, long j) {
        return checkState(state -> {
            return state.equals(State.COMMITTING_TXN) || state.equals(State.SEALING);
        }).thenCompose(r11 -> {
            return addSealedSegmentsToRecord(map).thenCompose(r4 -> {
                return getActiveEpoch(true);
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) historyRecord -> {
                return rollingTxnAddNewDuplicateEpochs(i, j);
            });
        });
    }

    private CompletableFuture<Void> rollingTxnAddNewDuplicateEpochs(int i, long j) {
        return getHistoryIndexFromStore().thenCompose(data -> {
            return getHistoryTableFromStore().thenCompose(data -> {
                int epoch = TableHelper.getActiveEpoch(data.getData(), data.getData()).getEpoch();
                HistoryRecord historyRecord = HistoryRecord.readLatestRecord(data.getData(), data.getData(), false).get();
                int i2 = epoch + 2;
                if (historyRecord.getEpoch() <= epoch) {
                    Pair<byte[], byte[]> insertDuplicateRecordsInHistoryTable = TableHelper.insertDuplicateRecordsInHistoryTable(data.getData(), data.getData(), i, j);
                    Data<T> data = new Data<>((byte[]) insertDuplicateRecordsInHistoryTable.getKey(), data.getVersion());
                    Data data2 = new Data((byte[]) insertDuplicateRecordsInHistoryTable.getValue(), data.getVersion());
                    return updateHistoryIndex(data).thenCompose(r5 -> {
                        return updateHistoryTable(data2);
                    }).whenComplete((BiConsumer<? super U, ? super Throwable>) (r10, th) -> {
                        if (th == null) {
                            log.debug("{}/{} rolling transaction for epoch {}. Creating new epoch and updating history table.", new Object[]{this.scope, this.name, Integer.valueOf(i)});
                        } else {
                            log.warn("{}/{} rollingTransaction for epoch {}. Failed to add partial record to history table.", new Object[]{this.scope, this.name, Integer.valueOf(epoch), th});
                        }
                    });
                }
                boolean z = historyRecord.isPartial() && historyRecord.getEpoch() == i2 && historyRecord.getReferenceEpoch() == epoch;
                if (z) {
                    z &= HistoryRecord.fetchPrevious(historyRecord, data.getData(), data.getData()).get().getReferenceEpoch() == i;
                }
                if (z) {
                    log.debug("{}/{} rolling transaction for epoch {} - history record already added", new Object[]{this.scope, this.name, Integer.valueOf(i)});
                    return CompletableFuture.completedFuture(null);
                }
                log.warn("{}/{} rolling txn for epoch {} is inconsistent.", new Object[]{this.scope, this.name, Integer.valueOf(i)});
                throw new EpochTransitionOperationExceptions.InputInvalidException();
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> rollingTxnActiveEpochSealed(Map<Long, Long> map, int i, long j) {
        Predicate predicate = historyRecord -> {
            return historyRecord.getEpoch() == i + 2 && ((Set) historyRecord.getSegments().stream().map((v0) -> {
                return StreamSegmentNameUtils.getSegmentNumber(v0);
            }).collect(Collectors.toSet())).equals((Set) map.keySet().stream().map((v0) -> {
                return StreamSegmentNameUtils.getSegmentNumber(v0);
            }).collect(Collectors.toSet()));
        };
        return checkState(state -> {
            return state.equals(State.COMMITTING_TXN) || state.equals(State.SEALING);
        }).thenCompose(r14 -> {
            return addSealedSegmentsToRecord(map).thenCompose(r5 -> {
                return clearMarkers(map.keySet());
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r13 -> {
                return completePartialRecordInHistory(map, i, j, predicate);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> resetStateConditionally(State state) {
        return Futures.toVoid(getState(true).thenCompose(state2 -> {
            return state2.equals(state) ? updateState(State.ACTIVE) : CompletableFuture.completedFuture(null);
        }));
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<UUID> generateNewTxnId(int i, long j) {
        return getActiveEpoch(false).thenApply(historyRecord -> {
            return new UUID((historyRecord.getReferenceEpoch() << 32) | (i & 4294967295L), j);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedTransactionData> createTransaction(UUID uuid, long j, long j2) {
        long currentTimeMillis = System.currentTimeMillis();
        long j3 = currentTimeMillis + j;
        long j4 = currentTimeMillis + j2;
        int transactionEpoch = getTransactionEpoch(uuid);
        return verifyLegalState().thenCompose(r17 -> {
            return createNewTransaction(uuid, currentTimeMillis, j3, j4);
        }).thenApply((Function<? super U, ? extends U>) r19 -> {
            return new VersionedTransactionData(transactionEpoch, uuid, 0, TxnStatus.OPEN, currentTimeMillis, currentTimeMillis + j2);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedTransactionData> pingTransaction(VersionedTransactionData versionedTransactionData, long j) {
        int epoch = versionedTransactionData.getEpoch();
        UUID id = versionedTransactionData.getId();
        int version = versionedTransactionData.getVersion();
        long creationTime = versionedTransactionData.getCreationTime();
        long maxExecutionExpiryTime = versionedTransactionData.getMaxExecutionExpiryTime();
        TxnStatus status = versionedTransactionData.getStatus();
        return updateActiveTx(epoch, id, new Data<>(new ActiveTxnRecord(creationTime, System.currentTimeMillis() + j, maxExecutionExpiryTime, status).toByteArray(), Integer.valueOf(version))).thenApply(r19 -> {
            return new VersionedTransactionData(epoch, id, version + 1, status, creationTime, maxExecutionExpiryTime);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<VersionedTransactionData> getTransactionData(UUID uuid) {
        int transactionEpoch = getTransactionEpoch(uuid);
        return getActiveTx(transactionEpoch, uuid).thenApply(data -> {
            ActiveTxnRecord parse = ActiveTxnRecord.parse(data.getData());
            return new VersionedTransactionData(transactionEpoch, uuid, ((Integer) data.getVersion()).intValue(), parse.getTxnStatus(), parse.getTxCreationTimestamp(), parse.getMaxExecutionExpiryTime());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<TxnStatus> checkTransactionStatus(UUID uuid) {
        int transactionEpoch = getTransactionEpoch(uuid);
        return verifyLegalState().thenCompose(r7 -> {
            return getActiveTx(transactionEpoch, uuid).handle((data, th) -> {
                if (th != null && (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException)) {
                    return TxnStatus.UNKNOWN;
                }
                if (th != null) {
                    throw new CompletionException(th);
                }
                return ActiveTxnRecord.parse(data.getData()).getTxnStatus();
            }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) txnStatus -> {
                return txnStatus.equals(TxnStatus.UNKNOWN) ? getCompletedTxnStatus(uuid) : CompletableFuture.completedFuture(txnStatus);
            });
        });
    }

    private CompletableFuture<TxnStatus> getCompletedTxnStatus(UUID uuid) {
        return getCompletedTx(uuid).handle((data, th) -> {
            if (th != null && (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException)) {
                return TxnStatus.UNKNOWN;
            }
            if (th != null) {
                throw new CompletionException(th);
            }
            return CompletedTxnRecord.parse(data.getData()).getCompletionStatus();
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<AbstractMap.SimpleEntry<TxnStatus, Integer>> sealTransaction(UUID uuid, boolean z, Optional<Integer> optional) {
        CompletableFuture<Void> verifyLegalState = verifyLegalState();
        int transactionEpoch = getTransactionEpoch(uuid);
        return verifyLegalState.thenCompose(r11 -> {
            return sealActiveTxn(transactionEpoch, uuid, z, optional);
        }).exceptionally((Function<Throwable, ? extends U>) th -> {
            return new AbstractMap.SimpleEntry(handleDataNotFoundException(th), null);
        }).thenCompose(simpleEntry -> {
            return simpleEntry.getKey() == TxnStatus.UNKNOWN ? validateCompletedTxn(uuid, z, "seal").thenApply(txnStatus -> {
                return new AbstractMap.SimpleEntry(txnStatus, null);
            }) : CompletableFuture.completedFuture(simpleEntry);
        });
    }

    private CompletableFuture<AbstractMap.SimpleEntry<TxnStatus, Integer>> sealActiveTxn(int i, UUID uuid, boolean z, Optional<Integer> optional) {
        return getActiveTx(i, uuid).thenCompose(data -> {
            ActiveTxnRecord parse = ActiveTxnRecord.parse(data.getData());
            int intValue = optional.isPresent() ? ((Integer) optional.get()).intValue() : ((Integer) data.getVersion()).intValue();
            TxnStatus txnStatus = parse.getTxnStatus();
            switch (AnonymousClass1.$SwitchMap$io$pravega$controller$store$stream$TxnStatus[txnStatus.ordinal()]) {
                case ApiResponseMessage.ERROR /* 1 */:
                case ApiResponseMessage.WARNING /* 2 */:
                    if (z) {
                        throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + getName() + " Transaction: " + uuid.toString() + " State: " + txnStatus.name());
                    }
                    return CompletableFuture.completedFuture(new AbstractMap.SimpleEntry(txnStatus, Integer.valueOf(i)));
                case ApiResponseMessage.INFO /* 3 */:
                    return sealActiveTx(i, uuid, z, parse, intValue).thenApply(r7 -> {
                        return new AbstractMap.SimpleEntry(z ? TxnStatus.COMMITTING : TxnStatus.ABORTING, Integer.valueOf(i));
                    });
                case ApiResponseMessage.OK /* 4 */:
                case ApiResponseMessage.TOO_BUSY /* 5 */:
                    if (z) {
                        return CompletableFuture.completedFuture(new AbstractMap.SimpleEntry(txnStatus, Integer.valueOf(i)));
                    }
                    throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + getName() + " Transaction: " + uuid.toString() + " State: " + txnStatus.name());
                default:
                    throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Stream: " + getName() + " Transaction: " + uuid.toString());
            }
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<TxnStatus> commitTransaction(UUID uuid) {
        int transactionEpoch = getTransactionEpoch(uuid);
        return checkState(state -> {
            return state.equals(State.COMMITTING_TXN) || state.equals(State.SEALING);
        }).thenCompose(r5 -> {
            return checkTransactionStatus(uuid);
        }).thenApply((Function<? super U, ? extends U>) txnStatus -> {
            switch (AnonymousClass1.$SwitchMap$io$pravega$controller$store$stream$TxnStatus[txnStatus.ordinal()]) {
                case ApiResponseMessage.ERROR /* 1 */:
                case ApiResponseMessage.WARNING /* 2 */:
                case ApiResponseMessage.INFO /* 3 */:
                    throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + getName() + " Transaction: " + uuid.toString() + " State: " + txnStatus.toString());
                case ApiResponseMessage.OK /* 4 */:
                case ApiResponseMessage.TOO_BUSY /* 5 */:
                    return txnStatus;
                case 6:
                default:
                    throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Stream: " + getName() + " Transaction: " + uuid.toString());
            }
        }).thenCompose(txnStatus2 -> {
            return txnStatus2.equals(TxnStatus.COMMITTING) ? createCompletedTxEntry(uuid, TxnStatus.COMMITTED, System.currentTimeMillis()) : CompletableFuture.completedFuture(null);
        }).thenCompose(r7 -> {
            return removeActiveTxEntry(transactionEpoch, uuid);
        }).thenApply(r2 -> {
            return TxnStatus.COMMITTED;
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<TxnStatus> abortTransaction(UUID uuid) {
        int transactionEpoch = getTransactionEpoch(uuid);
        return verifyLegalState().thenCompose(r5 -> {
            return checkTransactionStatus(uuid);
        }).thenApply((Function<? super U, ? extends U>) txnStatus -> {
            switch (AnonymousClass1.$SwitchMap$io$pravega$controller$store$stream$TxnStatus[txnStatus.ordinal()]) {
                case ApiResponseMessage.ERROR /* 1 */:
                case ApiResponseMessage.WARNING /* 2 */:
                    return txnStatus;
                case ApiResponseMessage.INFO /* 3 */:
                case ApiResponseMessage.OK /* 4 */:
                case ApiResponseMessage.TOO_BUSY /* 5 */:
                    throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + getName() + " Transaction: " + uuid.toString() + " State: " + txnStatus.name());
                case 6:
                default:
                    throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Stream: " + getName() + " Transaction: " + uuid.toString());
            }
        }).thenCompose(txnStatus2 -> {
            return txnStatus2.equals(TxnStatus.ABORTING) ? createCompletedTxEntry(uuid, TxnStatus.ABORTED, System.currentTimeMillis()) : CompletableFuture.completedFuture(null);
        }).thenCompose(r7 -> {
            return removeActiveTxEntry(transactionEpoch, uuid);
        }).thenApply(r2 -> {
            return TxnStatus.ABORTED;
        });
    }

    private TxnStatus handleDataNotFoundException(Throwable th) {
        if (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException) {
            return TxnStatus.UNKNOWN;
        }
        throw th;
    }

    private CompletableFuture<TxnStatus> validateCompletedTxn(UUID uuid, boolean z, String str) {
        return getCompletedTxnStatus(uuid).thenApply(txnStatus -> {
            if ((z && txnStatus == TxnStatus.COMMITTED) || (!z && txnStatus == TxnStatus.ABORTED)) {
                return txnStatus;
            }
            if (txnStatus == TxnStatus.UNKNOWN) {
                throw StoreException.create(StoreException.Type.DATA_NOT_FOUND, "Stream: " + getName() + " Transaction: " + uuid.toString());
            }
            throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + getName() + " Transaction: " + uuid.toString() + " State: " + txnStatus.name());
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Map<UUID, ActiveTxnRecord>> getActiveTxns() {
        return verifyLegalState().thenCompose(r3 -> {
            return getCurrentTxns();
        }).thenApply((Function<? super U, ? extends U>) map -> {
            return (Map) map.entrySet().stream().collect(Collectors.toMap(entry -> {
                return UUID.fromString((String) entry.getKey());
            }, entry2 -> {
                return ActiveTxnRecord.parse(((Data) entry2.getValue()).getData());
            }));
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<HistoryRecord> getActiveEpoch(boolean z) {
        return (z ? getHistoryIndexFromStore() : getHistoryIndex()).thenCompose(data -> {
            return (z ? getHistoryTableFromStore() : getHistoryTable()).thenApply(data -> {
                return TableHelper.getActiveEpoch(data.getData(), data.getData());
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<HistoryRecord> getEpochRecord(int i) {
        return getHistoryIndex().thenCompose(data -> {
            return getHistoryTable().thenApply(data -> {
                return TableHelper.getEpochRecord(data.getData(), data.getData(), i);
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> setColdMarker(long j, long j2) {
        return verifyLegalState().thenCompose(r7 -> {
            return getMarkerData(j);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) data -> {
            if (data == null) {
                return createMarkerData(j, j2);
            }
            byte[] bArr = new byte[8];
            BitConverter.writeLong(bArr, 0, j2);
            return updateMarkerData(j, new Data<>(bArr, data.getVersion()));
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Long> getColdMarker(long j) {
        return verifyLegalState().thenCompose(r7 -> {
            return getMarkerData(j);
        }).thenApply((Function<? super U, ? extends U>) data -> {
            return Long.valueOf(data != null ? BitConverter.readLong(data.getData(), 0) : 0L);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> removeColdMarker(long j) {
        return verifyLegalState().thenCompose(r7 -> {
            return removeMarkerData(j);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Long> getSizeTillStreamCut(Map<Long, Long> map) {
        return getHistoryIndex().thenCompose(data -> {
            return getHistoryTable().thenCompose(data -> {
                return getSegmentIndex().thenCompose(data -> {
                    return getSegmentTable().thenCompose(data -> {
                        return getSealedSegmentsRecord().thenApply(data -> {
                            return Long.valueOf(TableHelper.getSizeTillStreamCut(data.getData(), data.getData(), data.getData(), data.getData(), map, SealedSegmentsRecord.parse(data.getData())));
                        });
                    });
                });
            });
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> addStreamCutToRetentionSet(StreamCutRecord streamCutRecord) {
        return getRetentionSet().thenCompose(data -> {
            RetentionRecord parse = RetentionRecord.parse(data.getData());
            return parse.getStreamCuts().contains(streamCutRecord) ? CompletableFuture.completedFuture(null) : updateRetentionSet(new Data<>(RetentionRecord.addStreamCutIfLatest(parse, streamCutRecord).toByteArray(), data.getVersion()));
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<List<StreamCutRecord>> getRetentionStreamCuts() {
        return getRetentionSet().thenApply(data -> {
            return RetentionRecord.parse(data.getData());
        }).thenApply((Function<? super U, ? extends U>) (v0) -> {
            return v0.getStreamCuts();
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> deleteStreamCutBefore(StreamCutRecord streamCutRecord) {
        return getRetentionSet().thenCompose(data -> {
            RetentionRecord parse = RetentionRecord.parse(data.getData());
            return !parse.getStreamCuts().contains(streamCutRecord) ? CompletableFuture.completedFuture(null) : updateRetentionSet(new Data<>(RetentionRecord.removeStreamCutBefore(parse, streamCutRecord).toByteArray(), data.getVersion()));
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> createCommittingTransactionsRecord(int i, List<UUID> list) {
        return createCommittingTxnRecord(new CommittingTransactionsRecord(i, list).toByteArray());
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<CommittingTransactionsRecord> getCommittingTransactionsRecord() {
        CompletableFuture<CommittingTransactionsRecord> completableFuture = new CompletableFuture<>();
        getCommittingTxnRecord().whenComplete((data, th) -> {
            if (th == null) {
                completableFuture.complete(CommittingTransactionsRecord.parse(data.getData()));
            } else if (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException) {
                completableFuture.complete(null);
            } else {
                completableFuture.completeExceptionally(th);
            }
        });
        return completableFuture;
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> deleteCommittingTransactionsRecord() {
        return deleteCommittingTxnRecord();
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Map<UUID, ActiveTxnRecord>> getTransactionsInEpoch(int i) {
        return getTxnInEpoch(i).thenApply(map -> {
            return (Map) map.entrySet().stream().collect(Collectors.toMap(entry -> {
                return UUID.fromString((String) entry.getKey());
            }, entry2 -> {
                return ActiveTxnRecord.parse(((Data) entry2.getValue()).getData());
            }));
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> createWaitingRequestIfAbsent(String str) {
        return createWaitingRequestNodeIfAbsent(str.getBytes());
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<String> getWaitingRequestProcessor() {
        return getWaitingRequestNode().handle((data, th) -> {
            if (th == null) {
                return new String(data.getData());
            }
            if (Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException) {
                return null;
            }
            throw new CompletionException(th);
        });
    }

    @Override // io.pravega.controller.store.stream.Stream
    public CompletableFuture<Void> deleteWaitingRequestConditionally(String str) {
        return getWaitingRequestProcessor().thenCompose(str2 -> {
            return (str2 == null || !str2.equals(str)) ? CompletableFuture.completedFuture(null) : deleteWaitingRequestNode();
        });
    }

    private CompletableFuture<Void> checkState(Predicate<State> predicate) {
        return getState(true).thenAccept(state -> {
            if (!predicate.test(state)) {
                throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + getName() + " Current State: " + state.name());
            }
        });
    }

    private CompletableFuture<Void> verifyLegalState() {
        return getState(false).thenApply(state -> {
            if (state == null || state.equals(State.UNKNOWN) || state.equals(State.CREATING)) {
                throw StoreException.create(StoreException.Type.ILLEGAL_STATE, "Stream: " + getName() + " State: " + state.name());
            }
            return null;
        });
    }

    private CompletableFuture<Void> addSealedSegmentsToRecord(Map<Long, Long> map) {
        return getSealedSegmentsRecord().thenCompose(data -> {
            SealedSegmentsRecord parse = SealedSegmentsRecord.parse(data.getData());
            HashMap hashMap = new HashMap();
            hashMap.putAll(map);
            hashMap.putAll(parse.getSealedSegmentsSizeMap());
            return updateSealedSegmentsRecord(new Data<>(new SealedSegmentsRecord(hashMap).toByteArray(), data.getVersion()));
        });
    }

    private List<Long> getNewActiveSegments(Set<Long> set, Set<Long> set2, HistoryRecord historyRecord) {
        List<Long> segments = historyRecord.getSegments();
        segments.removeAll(set2);
        segments.addAll(set);
        return segments;
    }

    private CompletableFuture<Void> addHistoryIndexRecord(int i, int i2) {
        return getHistoryIndex().thenCompose(data -> {
            Optional<HistoryIndexRecord> readLatestRecord = HistoryIndexRecord.readLatestRecord(data.getData());
            return (readLatestRecord.isPresent() && readLatestRecord.get().getEpoch() == i) ? CompletableFuture.completedFuture(null) : updateHistoryIndex(new Data<>(TableHelper.updateHistoryIndex(data.getData(), i2), data.getVersion()));
        });
    }

    private CompletableFuture<Segment> getSegmentRow(long j) {
        return getHistoryIndex().thenCompose(data -> {
            return getHistoryTable().thenCompose(data -> {
                return getSegmentIndex().thenCompose(data -> {
                    return getSegmentTable().thenApply(data -> {
                        return TableHelper.getSegment(j, data.getData(), data.getData(), data.getData(), data.getData());
                    });
                });
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getTransactionEpoch(UUID uuid) {
        return TableHelper.getTransactionEpoch(uuid);
    }

    abstract CompletableFuture<Void> deleteStream();

    abstract CompletableFuture<CreateStreamResponse> checkStreamExists(StreamConfiguration streamConfiguration, long j);

    abstract CompletableFuture<Void> storeCreationTimeIfAbsent(long j);

    abstract CompletableFuture<Void> createConfigurationIfAbsent(StreamConfigurationRecord streamConfigurationRecord);

    abstract CompletableFuture<Void> setConfigurationData(Data<T> data);

    abstract CompletableFuture<Data<T>> getConfigurationData(boolean z);

    abstract CompletableFuture<Void> setTruncationData(Data<T> data);

    abstract CompletableFuture<Void> createTruncationDataIfAbsent(StreamTruncationRecord streamTruncationRecord);

    abstract CompletableFuture<Data<T>> getTruncationData(boolean z);

    abstract CompletableFuture<Void> createStateIfAbsent(State state);

    abstract CompletableFuture<Void> setStateData(Data<T> data);

    abstract CompletableFuture<Data<T>> getStateData(boolean z);

    abstract CompletableFuture<Void> createSegmentIndexIfAbsent(Data<T> data);

    abstract CompletableFuture<Data<T>> getSegmentIndex();

    abstract CompletableFuture<Data<T>> getSegmentIndexFromStore();

    abstract CompletableFuture<Void> updateSegmentIndex(Data<T> data);

    abstract CompletableFuture<Void> createSegmentTableIfAbsent(Data<T> data);

    abstract CompletableFuture<Data<T>> getSegmentTable();

    abstract CompletableFuture<Data<T>> getSegmentTableFromStore();

    abstract CompletableFuture<Void> updateSegmentTable(Data<T> data);

    abstract CompletableFuture<Void> createHistoryIndexIfAbsent(Data<T> data);

    abstract CompletableFuture<Data<T>> getHistoryIndex();

    abstract CompletableFuture<Data<T>> getHistoryIndexFromStore();

    abstract CompletableFuture<Void> updateHistoryIndex(Data<T> data);

    abstract CompletableFuture<Void> createHistoryTableIfAbsent(Data<T> data);

    abstract CompletableFuture<Void> updateHistoryTable(Data<T> data);

    abstract CompletableFuture<Data<T>> getHistoryTable();

    abstract CompletableFuture<Data<T>> getHistoryTableFromStore();

    abstract CompletableFuture<Void> createNewTransaction(UUID uuid, long j, long j2, long j3);

    abstract CompletableFuture<Data<Integer>> getActiveTx(int i, UUID uuid);

    abstract CompletableFuture<Void> updateActiveTx(int i, UUID uuid, Data<Integer> data);

    abstract CompletableFuture<Void> sealActiveTx(int i, UUID uuid, boolean z, ActiveTxnRecord activeTxnRecord, int i2);

    abstract CompletableFuture<Data<Integer>> getCompletedTx(UUID uuid);

    abstract CompletableFuture<Void> removeActiveTxEntry(int i, UUID uuid);

    abstract CompletableFuture<Void> createCompletedTxEntry(UUID uuid, TxnStatus txnStatus, long j);

    abstract CompletableFuture<Void> createMarkerData(long j, long j2);

    abstract CompletableFuture<Void> updateMarkerData(long j, Data<T> data);

    abstract CompletableFuture<Void> removeMarkerData(long j);

    abstract CompletableFuture<Data<T>> getMarkerData(long j);

    abstract CompletableFuture<Map<String, Data<T>>> getCurrentTxns();

    abstract CompletableFuture<Map<String, Data<T>>> getTxnInEpoch(int i);

    abstract CompletableFuture<Void> checkScopeExists() throws StoreException;

    abstract CompletableFuture<Void> createSealedSegmentsRecord(byte[] bArr);

    abstract CompletableFuture<Data<T>> getSealedSegmentsRecord();

    abstract CompletableFuture<Void> updateSealedSegmentsRecord(Data<T> data);

    abstract CompletableFuture<Void> createRetentionSet(byte[] bArr);

    abstract CompletableFuture<Data<T>> getRetentionSet();

    abstract CompletableFuture<Void> updateRetentionSet(Data<T> data);

    abstract CompletableFuture<Void> createEpochTransitionNode(byte[] bArr);

    abstract CompletableFuture<Void> updateEpochTransitionNode(byte[] bArr);

    abstract CompletableFuture<Data<T>> getEpochTransitionNode();

    abstract CompletableFuture<Void> deleteEpochTransitionNode();

    abstract CompletableFuture<Void> createCommittingTxnRecord(byte[] bArr);

    abstract CompletableFuture<Data<T>> getCommittingTxnRecord();

    abstract CompletableFuture<Void> deleteCommittingTxnRecord();

    abstract CompletableFuture<Void> createWaitingRequestNodeIfAbsent(byte[] bArr);

    abstract CompletableFuture<Data<T>> getWaitingRequestNode();

    abstract CompletableFuture<Void> deleteWaitingRequestNode();

    static {
        $assertionsDisabled = !PersistentStreamBase.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(PersistentStreamBase.class);
    }
}
