package io.pravega.controller.store;

import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.pravega.client.tables.impl.TableSegmentEntry;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.hash.HashHelper;
import io.pravega.common.tracing.TagLogger;
import io.pravega.common.util.AsyncIterator;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.PravegaTablesStreamMetadataStore;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.records.TagRecord;
import io.pravega.shared.NameUtils;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/store/PravegaTablesScope.class */
public class PravegaTablesScope implements Scope {
    private static final String KVTABLES_IN_SCOPE_TABLE_FORMAT = "kvTablesInScope.#.%s";
    private static final String READER_GROUPS_IN_SCOPE_TABLE_FORMAT = "readerGroupsInScope.#.%s";
    private static final String STREAMS_IN_SCOPE_TABLE_FORMAT = "streamsInScope.#.%s";
    private static final String STREAM_TAGS_IN_SCOPE = "streamTagsInScope.#.%s.#.%s";
    private static final int MAX_STREAM_TAG_CHUNKS = 25;
    private static final String LAST_TAG_CHUNK = ".#.24";
    private final String scopeName;
    private final PravegaTablesStoreHelper storeHelper;
    private final AtomicReference<UUID> idRef = new AtomicReference<>(null);
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(PravegaTablesScope.class));
    private static final HashHelper HASH = HashHelper.seededWith("StreamTags");

    public PravegaTablesScope(String str, PravegaTablesStoreHelper pravegaTablesStoreHelper) {
        this.scopeName = str;
        this.storeHelper = pravegaTablesStoreHelper;
    }

    @Override // io.pravega.controller.store.Scope
    public String getName() {
        return this.scopeName;
    }

    @Override // io.pravega.controller.store.Scope
    public CompletableFuture<Void> createScope(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation context cannot be null");
        return Futures.handleCompose(withCreateTableIfAbsent(() -> {
            return this.storeHelper.addNewEntry(PravegaTablesStreamMetadataStore.SCOPES_TABLE, this.scopeName, newId(), PravegaTablesStoreHelper.UUID_TO_BYTES_FUNCTION, operationContext.getRequestId());
        }, PravegaTablesStreamMetadataStore.SCOPES_TABLE, operationContext), (version, th) -> {
            if (th == null || (Exceptions.unwrap(th) instanceof StoreException.DataExistsException)) {
                return CompletableFuture.allOf(getStreamsInScopeTableName(operationContext).thenCompose(str -> {
                    return this.storeHelper.createTable(str, operationContext.getRequestId()).thenAccept(r12 -> {
                        log.debug(operationContext.getRequestId(), "table for streams created {}", new Object[]{str});
                        if (th != null) {
                            throw new CompletionException(th);
                        }
                    });
                }), getKVTablesInScopeTableName(operationContext).thenCompose(str2 -> {
                    return this.storeHelper.createTable(str2, operationContext.getRequestId()).thenAccept(r12 -> {
                        log.debug(operationContext.getRequestId(), "table for kvts created {}", new Object[]{str2});
                        if (th != null) {
                            throw new CompletionException(th);
                        }
                    });
                }), getReaderGroupsInScopeTableName(operationContext).thenCompose(str3 -> {
                    return this.storeHelper.createTable(str3, operationContext.getRequestId()).thenAccept(r12 -> {
                        log.debug(operationContext.getRequestId(), "table for reader groups created {}", new Object[]{str3});
                        if (th != null) {
                            throw new CompletionException(th);
                        }
                    });
                }), getAllStreamTagsInScopeTableNames(operationContext).thenCompose(list -> {
                    ArrayList arrayList = new ArrayList();
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        arrayList.add(this.storeHelper.createTable((String) it.next(), operationContext.getRequestId()));
                    }
                    return Futures.allOf(arrayList);
                }).thenAccept((Consumer<? super U>) r5 -> {
                    if (th != null) {
                        throw new CompletionException(th);
                    }
                }));
            }
            throw new CompletionException(th);
        });
    }

    public CompletableFuture<String> getStreamsInScopeTableName(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation context cannot be null");
        return getStreamsInScopeTableName(true, operationContext);
    }

    public CompletableFuture<String> getStreamsInScopeTableName(boolean z, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation context cannot be null");
        if (z) {
            this.storeHelper.invalidateCache(PravegaTablesStreamMetadataStore.SCOPES_TABLE, this.scopeName);
        }
        return getId(operationContext).thenApply(uuid -> {
            return NameUtils.getQualifiedTableName("_system", new String[]{this.scopeName, String.format(STREAMS_IN_SCOPE_TABLE_FORMAT, uuid.toString())});
        });
    }

    public CompletableFuture<String> getKVTablesInScopeTableName(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation context cannot be null");
        return getKVTablesInScopeTableName(true, operationContext);
    }

    public CompletableFuture<String> getKVTablesInScopeTableName(boolean z, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation context cannot be null");
        if (z) {
            this.storeHelper.invalidateCache(PravegaTablesStreamMetadataStore.SCOPES_TABLE, this.scopeName);
        }
        return getId(operationContext).thenApply(uuid -> {
            return NameUtils.getQualifiedTableName("_system", new String[]{this.scopeName, String.format(KVTABLES_IN_SCOPE_TABLE_FORMAT, uuid.toString())});
        });
    }

    public CompletableFuture<String> getAllStreamTagsInScopeTableNames(String str, OperationContext operationContext) {
        return getId(operationContext).thenApply(uuid -> {
            return NameUtils.getQualifiedTableName("_system", new String[]{this.scopeName, String.format(STREAM_TAGS_IN_SCOPE, uuid.toString(), Integer.valueOf(HASH.hashToBucket(str, MAX_STREAM_TAG_CHUNKS)))});
        });
    }

    public CompletableFuture<List<String>> getAllStreamTagsInScopeTableNames(OperationContext operationContext) {
        return getId(operationContext).thenApply(uuid -> {
            return (List) IntStream.range(0, MAX_STREAM_TAG_CHUNKS).boxed().map(num -> {
                return NameUtils.getQualifiedTableName("_system", new String[]{this.scopeName, String.format(STREAM_TAGS_IN_SCOPE, uuid.toString(), num)});
            }).collect(Collectors.toList());
        });
    }

    public CompletableFuture<String> getReaderGroupsInScopeTableName(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation context cannot be null");
        return getReaderGroupsInScopeTableName(true, operationContext);
    }

    public CompletableFuture<String> getReaderGroupsInScopeTableName(boolean z, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation context cannot be null");
        if (z) {
            this.storeHelper.invalidateCache(PravegaTablesStreamMetadataStore.SCOPES_TABLE, this.scopeName);
        }
        return getId(operationContext).thenApply(uuid -> {
            return NameUtils.getQualifiedTableName("_system", new String[]{this.scopeName, String.format(READER_GROUPS_IN_SCOPE_TABLE_FORMAT, uuid.toString())});
        });
    }

    CompletableFuture<UUID> getId(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation context cannot be null");
        UUID uuid = this.idRef.get();
        return Objects.isNull(uuid) ? this.storeHelper.getCachedOrLoad(PravegaTablesStreamMetadataStore.SCOPES_TABLE, this.scopeName, PravegaTablesStoreHelper.BYTES_TO_UUID_FUNCTION, 0L, operationContext.getRequestId()).thenCompose(versionedMetadata -> {
            this.idRef.compareAndSet(null, (UUID) versionedMetadata.getObject());
            return getId(operationContext);
        }) : CompletableFuture.completedFuture(uuid);
    }

    @Override // io.pravega.controller.store.Scope
    public CompletableFuture<Void> deleteScope(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation context cannot be null");
        CompletableFuture<String> streamsInScopeTableName = getStreamsInScopeTableName(true, operationContext);
        CompletableFuture<String> readerGroupsInScopeTableName = getReaderGroupsInScopeTableName(operationContext);
        CompletableFuture<String> kVTablesInScopeTableName = getKVTablesInScopeTableName(operationContext);
        CompletableFuture<List<String>> allStreamTagsInScopeTableNames = getAllStreamTagsInScopeTableNames(operationContext);
        return CompletableFuture.allOf(streamsInScopeTableName, readerGroupsInScopeTableName, kVTablesInScopeTableName, allStreamTagsInScopeTableNames).thenCompose(r15 -> {
            String str = (String) streamsInScopeTableName.join();
            String str2 = (String) kVTablesInScopeTableName.join();
            String str3 = (String) readerGroupsInScopeTableName.join();
            return CompletableFuture.allOf(this.storeHelper.deleteTable(str, true, operationContext.getRequestId()), this.storeHelper.deleteTable(str2, true, operationContext.getRequestId()), this.storeHelper.deleteTable(str3, true, operationContext.getRequestId()), Futures.allOf((List) ((List) allStreamTagsInScopeTableNames.join()).stream().map(str4 -> {
                return this.storeHelper.deleteTable(str4, false, operationContext.getRequestId());
            }).collect(Collectors.toList()))).thenAccept(r10 -> {
                log.debug("tables deleted {} {} {}", new Object[]{str, str2, str3});
            });
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r8 -> {
            return this.storeHelper.removeEntry(PravegaTablesStreamMetadataStore.SCOPES_TABLE, this.scopeName, operationContext.getRequestId());
        });
    }

    @Override // io.pravega.controller.store.Scope
    public CompletableFuture<Pair<List<String>, String>> listStreams(int i, String str, Executor executor, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation context cannot be null");
        return getStreamsInScopeTableName(operationContext).thenCompose(str2 -> {
            return readAll(i, str, str2, operationContext);
        });
    }

    @Override // io.pravega.controller.store.Scope
    public CompletableFuture<Pair<List<String>, String>> listStreamsForTag(String str, String str2, Executor executor, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation context cannot be null");
        return getStreamsFromNextTagChunk(str, str2, operationContext).thenCompose(pair -> {
            return (!((List) pair.getLeft()).isEmpty() || ((String) pair.getRight()).endsWith(LAST_TAG_CHUNK)) ? CompletableFuture.completedFuture(pair) : listStreamsForTag(str, (String) pair.getRight(), executor, operationContext);
        });
    }

    CompletableFuture<Pair<List<String>, String>> getStreamsFromNextTagChunk(String str, String str2, OperationContext operationContext) {
        return str2.endsWith(LAST_TAG_CHUNK) ? CompletableFuture.completedFuture(new ImmutablePair(Collections.emptyList(), str2)) : getAllStreamTagsInScopeTableNames(operationContext).thenApply(list -> {
            return str2.isEmpty() ? (String) list.get(0) : (String) list.get(list.indexOf(str2) + 1);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) str3 -> {
            return this.storeHelper.expectingDataNotFound(this.storeHelper.getEntry(str3, str, TagRecord::fromBytes, operationContext.getRequestId()).thenApply(versionedMetadata -> {
                return new ImmutablePair(new ArrayList(((TagRecord) versionedMetadata.getObject()).getStreams()), str3);
            }), new ImmutablePair(Collections.emptyList(), str3));
        });
    }

    @Override // io.pravega.controller.store.Scope
    public CompletableFuture<List<String>> listStreamsInScope(OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation context cannot be null");
        ArrayList arrayList = new ArrayList();
        return getStreamsInScopeTableName(operationContext).thenCompose(str -> {
            AsyncIterator<String> allKeys = this.storeHelper.getAllKeys(str, operationContext.getRequestId());
            Objects.requireNonNull(arrayList);
            return Futures.exceptionallyExpecting(allKeys.collectRemaining((v1) -> {
                return r1.add(v1);
            }).thenApply(r3 -> {
                return arrayList;
            }), PravegaTablesStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, Collections.emptyList());
        });
    }

    @Override // io.pravega.controller.store.Scope
    public void refresh() {
        this.idRef.set(null);
    }

    public CompletableFuture<Void> addStreamToScope(String str, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation context cannot be null");
        return getStreamsInScopeTableName(operationContext).thenCompose(str2 -> {
            return Futures.toVoid(withCreateTableIfAbsent(() -> {
                return this.storeHelper.addNewEntryIfAbsent(str2, str, newId(), PravegaTablesStoreHelper.UUID_TO_BYTES_FUNCTION, operationContext.getRequestId());
            }, str2, operationContext));
        });
    }

    public CompletableFuture<Void> addTagsUnderScope(String str, Set<String> set, OperationContext operationContext) {
        return getAllStreamTagsInScopeTableNames(str, operationContext).thenCompose(str2 -> {
            return Futures.allOf((Collection) ((Stream) set.stream().parallel()).map(str2 -> {
                log.debug(operationContext.getRequestId(), "Adding stream {} to tag {} index on table {}", new Object[]{str, str2, str2});
                return this.storeHelper.getAndUpdateEntry(str2, str2, tableSegmentEntry -> {
                    return appendStreamToEntry(str2, str, tableSegmentEntry);
                }, tableSegmentEntry2 -> {
                    return false;
                }, operationContext.getRequestId());
            }).collect(Collectors.toList()));
        });
    }

    private TableSegmentEntry appendStreamToEntry(String str, String str2, TableSegmentEntry tableSegmentEntry) {
        byte[] array = this.storeHelper.getArray(tableSegmentEntry.getValue());
        return TableSegmentEntry.versioned(str.getBytes(StandardCharsets.UTF_8), array.length == 0 ? TagRecord.builder().tagName(str).stream(str2).m228build().toBytes() : TagRecord.fromBytes(array).toBuilder().stream(str2).m228build().toBytes(), tableSegmentEntry.getKey().getVersion().getSegmentVersion());
    }

    public CompletableFuture<Void> removeTagsUnderScope(String str, Set<String> set, OperationContext operationContext) {
        return getAllStreamTagsInScopeTableNames(str, operationContext).thenCompose(str2 -> {
            return Futures.allOf((Collection) ((Stream) set.stream().parallel()).map(str2 -> {
                log.debug(operationContext.getRequestId(), "Removing stream {} from tag {} index on table {}", new Object[]{str, str2, str2});
                return this.storeHelper.getAndUpdateEntry(str2, str2, tableSegmentEntry -> {
                    return removeStreamFromEntry(str2, str, tableSegmentEntry);
                }, this::isEmptyTagRecord, operationContext.getRequestId());
            }).collect(Collectors.toList()));
        });
    }

    private TableSegmentEntry removeStreamFromEntry(String str, String str2, TableSegmentEntry tableSegmentEntry) {
        byte[] array = this.storeHelper.getArray(tableSegmentEntry.getValue());
        byte[] bArr = new byte[0];
        if (array.length != 0) {
            bArr = TagRecord.fromBytes(array).toBuilder().removeStream(str2).m228build().toBytes();
        }
        return TableSegmentEntry.versioned(str.getBytes(StandardCharsets.UTF_8), bArr, tableSegmentEntry.getKey().getVersion().getSegmentVersion());
    }

    private boolean isEmptyTagRecord(TableSegmentEntry tableSegmentEntry) {
        byte[] array = this.storeHelper.getArray(tableSegmentEntry.getValue());
        return array.length == 0 || TagRecord.fromBytes(array).getStreams().isEmpty();
    }

    public CompletableFuture<Void> removeStreamFromScope(String str, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation context cannot be null");
        return getStreamsInScopeTableName(operationContext).thenCompose(str2 -> {
            return Futures.toVoid(this.storeHelper.removeEntry(str2, str, operationContext.getRequestId()));
        });
    }

    public CompletableFuture<Boolean> checkStreamExistsInScope(String str, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation context cannot be null");
        return getStreamsInScopeTableName(operationContext).thenCompose(str2 -> {
            return this.storeHelper.expectingDataNotFound(this.storeHelper.getEntry(str2, str, bArr -> {
                return bArr;
            }, operationContext.getRequestId()).thenApply(versionedMetadata -> {
                return true;
            }), false);
        });
    }

    public CompletableFuture<Boolean> checkKeyValueTableExistsInScope(String str, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation context cannot be null");
        return getKVTablesInScopeTableName(operationContext).thenCompose(str2 -> {
            return this.storeHelper.expectingDataNotFound(this.storeHelper.getEntry(str2, str, bArr -> {
                return bArr;
            }, operationContext.getRequestId()).thenApply(versionedMetadata -> {
                return true;
            }), false);
        });
    }

    public CompletableFuture<Boolean> checkReaderGroupExistsInScope(String str, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation context cannot be null");
        return getReaderGroupsInScopeTableName(operationContext).thenCompose(str2 -> {
            return this.storeHelper.expectingDataNotFound(this.storeHelper.getEntry(str2, str, bArr -> {
                return bArr;
            }, operationContext.getRequestId()).thenApply(versionedMetadata -> {
                return true;
            }), false);
        });
    }

    public CompletableFuture<Void> addKVTableToScope(String str, UUID uuid, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation context cannot be null");
        return getKVTablesInScopeTableName(operationContext).thenCompose(str2 -> {
            return Futures.toVoid(withCreateTableIfAbsent(() -> {
                return this.storeHelper.addNewEntryIfAbsent(str2, str, uuid, PravegaTablesStoreHelper.UUID_TO_BYTES_FUNCTION, operationContext.getRequestId());
            }, str2, operationContext));
        });
    }

    public CompletableFuture<Void> removeKVTableFromScope(String str, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation context cannot be null");
        return getKVTablesInScopeTableName(operationContext).thenCompose(str2 -> {
            return Futures.toVoid(this.storeHelper.removeEntry(str2, str, operationContext.getRequestId()));
        });
    }

    public CompletableFuture<Void> addReaderGroupToScope(String str, UUID uuid, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation context cannot be null");
        return getReaderGroupsInScopeTableName(operationContext).thenCompose(str2 -> {
            return Futures.toVoid(withCreateTableIfAbsent(() -> {
                return this.storeHelper.addNewEntryIfAbsent(str2, str, uuid, PravegaTablesStoreHelper.UUID_TO_BYTES_FUNCTION, operationContext.getRequestId());
            }, str2, operationContext));
        });
    }

    public CompletableFuture<Void> removeReaderGroupFromScope(String str, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation context cannot be null");
        return getReaderGroupsInScopeTableName(operationContext).thenCompose(str2 -> {
            return Futures.toVoid(this.storeHelper.removeEntry(str2, str, operationContext.getRequestId()));
        });
    }

    @Override // io.pravega.controller.store.Scope
    public CompletableFuture<UUID> getReaderGroupId(String str, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation context cannot be null");
        return getReaderGroupsInScopeTableName(operationContext).thenCompose(str2 -> {
            return this.storeHelper.getEntry(str2, str, PravegaTablesStoreHelper.BYTES_TO_UUID_FUNCTION, operationContext.getRequestId()).thenApply((v0) -> {
                return v0.getObject();
            });
        });
    }

    @Override // io.pravega.controller.store.Scope
    public CompletableFuture<Pair<List<String>, String>> listKeyValueTables(int i, String str, Executor executor, OperationContext operationContext) {
        Preconditions.checkNotNull(operationContext, "Operation context cannot be null");
        return getKVTablesInScopeTableName(operationContext).thenCompose(str2 -> {
            return readAll(i, str, str2, operationContext);
        });
    }

    private <T> CompletableFuture<T> withCreateTableIfAbsent(Supplier<CompletableFuture<T>> supplier, String str, OperationContext operationContext) {
        return Futures.exceptionallyComposeExpecting(supplier.get(), PravegaTablesStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, () -> {
            return this.storeHelper.createTable(str, operationContext.getRequestId()).thenCompose(r3 -> {
                return (CompletionStage) supplier.get();
            });
        });
    }

    private CompletableFuture<Pair<List<String>, String>> readAll(int i, String str, String str2, OperationContext operationContext) {
        ArrayList arrayList = new ArrayList();
        AtomicReference atomicReference = new AtomicReference(str);
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        return Futures.exceptionallyExpecting(this.storeHelper.getKeysPaginated(str2, Unpooled.wrappedBuffer(Base64.getDecoder().decode((String) atomicReference.get())), i, operationContext.getRequestId()).thenApply(entry -> {
            if (((List) entry.getValue()).isEmpty()) {
                atomicBoolean.set(false);
            } else {
                arrayList.addAll((Collection) entry.getValue());
            }
            atomicReference.set(Base64.getEncoder().encodeToString(((ByteBuf) entry.getKey()).array()));
            return new ImmutablePair(arrayList, (String) atomicReference.get());
        }), PravegaTablesStreamMetadataStore.DATA_NOT_FOUND_PREDICATE, new ImmutablePair(Collections.emptyList(), (String) atomicReference.get()));
    }
}
