package io.pravega.client.admin.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.ClientConfig;
import io.pravega.client.admin.KeyValueTableInfo;
import io.pravega.client.admin.StreamInfo;
import io.pravega.client.admin.StreamManager;
import io.pravega.client.connection.impl.ConnectionPool;
import io.pravega.client.connection.impl.ConnectionPoolImpl;
import io.pravega.client.connection.impl.SocketConnectionFactoryImpl;
import io.pravega.client.control.impl.Controller;
import io.pravega.client.control.impl.ControllerFailureException;
import io.pravega.client.control.impl.ControllerImpl;
import io.pravega.client.control.impl.ControllerImplConfig;
import io.pravega.client.stream.DeleteScopeFailedException;
import io.pravega.client.stream.InvalidStreamException;
import io.pravega.client.stream.ReaderGroupNotFoundException;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.StreamCut;
import io.pravega.client.stream.impl.StreamCutImpl;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.function.Callbacks;
import io.pravega.shaded.com.google.common.annotations.VisibleForTesting;
import io.pravega.shaded.com.google.common.base.Preconditions;
import io.pravega.shared.NameUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/admin/impl/StreamManagerImpl.class */
public class StreamManagerImpl implements StreamManager {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(StreamManagerImpl.class);
    private final Controller controller;
    private final ConnectionPool connectionPool;
    private final ScheduledExecutorService executor;
    private final StreamCutHelper streamCutHelper;

    public StreamManagerImpl(ClientConfig clientConfig) {
        this(clientConfig, ControllerImplConfig.builder().clientConfig(clientConfig).build());
    }

    @VisibleForTesting
    public StreamManagerImpl(ClientConfig clientConfig, ControllerImplConfig controllerImplConfig) {
        this.connectionPool = new ConnectionPoolImpl(clientConfig, new SocketConnectionFactoryImpl(clientConfig));
        this.executor = this.connectionPool.getInternalExecutor();
        this.controller = new ControllerImpl(controllerImplConfig, this.executor);
        this.streamCutHelper = new StreamCutHelper(this.controller, this.connectionPool);
    }

    @VisibleForTesting
    public StreamManagerImpl(Controller controller, ConnectionPool connectionPool) {
        this.controller = controller;
        this.connectionPool = connectionPool;
        this.executor = connectionPool.getInternalExecutor();
        this.streamCutHelper = new StreamCutHelper(controller, connectionPool);
    }

    @Override // io.pravega.client.admin.StreamManager
    public boolean createStream(String str, String str2, StreamConfiguration streamConfiguration) {
        NameUtils.validateUserStreamName(str2);
        NameUtils.validateUserScopeName(str);
        log.info("Creating scope/stream: {}/{} with configuration: {}", new Object[]{str, str2, streamConfiguration});
        return ((Boolean) Futures.getThrowingException(this.controller.createStream(str, str2, StreamConfiguration.builder().scalingPolicy(streamConfiguration.getScalingPolicy()).retentionPolicy(streamConfiguration.getRetentionPolicy()).tags(streamConfiguration.getTags()).build()))).booleanValue();
    }

    @Override // io.pravega.client.admin.StreamManager
    public boolean updateStream(String str, String str2, StreamConfiguration streamConfiguration) {
        NameUtils.validateUserStreamName(str2);
        NameUtils.validateUserScopeName(str);
        log.info("Updating scope/stream: {}/{} with configuration: {}", new Object[]{str, str2, streamConfiguration});
        return ((Boolean) Futures.getThrowingException(this.controller.updateStream(str, str2, StreamConfiguration.builder().scalingPolicy(streamConfiguration.getScalingPolicy()).retentionPolicy(streamConfiguration.getRetentionPolicy()).tags(streamConfiguration.getTags()).build()))).booleanValue();
    }

    @Override // io.pravega.client.admin.StreamManager
    public boolean truncateStream(String str, String str2, StreamCut streamCut) {
        NameUtils.validateUserStreamName(str2);
        NameUtils.validateUserScopeName(str);
        Preconditions.checkNotNull(streamCut);
        log.info("Truncating scope/stream: {}/{} with stream cut: {}", new Object[]{str, str2, streamCut});
        return ((Boolean) Futures.getThrowingException(this.controller.truncateStream(str, str2, streamCut))).booleanValue();
    }

    @Override // io.pravega.client.admin.StreamManager
    public boolean sealStream(String str, String str2) {
        NameUtils.validateUserStreamName(str2);
        NameUtils.validateUserScopeName(str);
        log.info("Sealing scope/stream: {}/{}", str, str2);
        return ((Boolean) Futures.getThrowingException(this.controller.sealStream(str, str2))).booleanValue();
    }

    @Override // io.pravega.client.admin.StreamManager
    public boolean deleteStream(String str, String str2) {
        NameUtils.validateUserStreamName(str2);
        NameUtils.validateUserScopeName(str);
        log.info("Deleting scope/stream: {}/{}", str, str2);
        return ((Boolean) Futures.getThrowingException(this.controller.deleteStream(str, str2))).booleanValue();
    }

    @Override // io.pravega.client.admin.StreamManager
    public Iterator<String> listScopes() {
        log.info("Listing scopes");
        return this.controller.listScopes().asIterator();
    }

    @Override // io.pravega.client.admin.StreamManager
    public boolean createScope(String str) {
        NameUtils.validateUserScopeName(str);
        log.info("Creating scope: {}", str);
        return ((Boolean) Futures.getThrowingException(this.controller.createScope(str))).booleanValue();
    }

    @Override // io.pravega.client.admin.StreamManager
    public boolean checkScopeExists(String str) {
        log.info("Checking if scope {} exists", str);
        return ((Boolean) Futures.getThrowingException(this.controller.checkScopeExists(str))).booleanValue();
    }

    @Override // io.pravega.client.admin.StreamManager
    public Iterator<Stream> listStreams(String str) {
        NameUtils.validateUserScopeName(str);
        log.info("Listing streams in scope: {}", str);
        return this.controller.listStreams(str).asIterator();
    }

    @Override // io.pravega.client.admin.StreamManager
    public Iterator<Stream> listStreams(String str, String str2) {
        NameUtils.validateUserScopeName(str);
        log.info("Listing streams in scope: {} which has tag: {}", str, str2);
        return this.controller.listStreamsForTag(str, str2).asIterator();
    }

    @Override // io.pravega.client.admin.StreamManager
    public Collection<String> getStreamTags(String str, String str2) {
        NameUtils.validateUserScopeName(str);
        NameUtils.validateUserStreamName(str2);
        log.info("Fetching tags associated with stream: {}/{}", str, str2);
        return (Collection) Futures.getThrowingException(this.controller.getStreamConfiguration(str, str2).thenApply((v0) -> {
            return v0.getTags();
        }));
    }

    @Override // io.pravega.client.admin.StreamManager
    public boolean checkStreamExists(String str, String str2) {
        log.info("Checking if stream {} exists in scope {}", str2, str);
        return ((Boolean) Futures.getThrowingException(this.controller.checkStreamExists(str, str2))).booleanValue();
    }

    @Override // io.pravega.client.admin.StreamManager
    public boolean deleteScope(String str) {
        NameUtils.validateUserScopeName(str);
        log.info("Deleting scope: {}", str);
        return ((Boolean) Futures.getThrowingException(this.controller.deleteScope(str))).booleanValue();
    }

    @Override // io.pravega.client.admin.StreamManager
    public boolean deleteScope(String str, boolean z) throws DeleteScopeFailedException {
        NameUtils.validateUserScopeName(str);
        log.info("Deleting scope: {}", str);
        if (z) {
            ArrayList<String> arrayList = new ArrayList();
            Iterator<Stream> listStreams = listStreams(str);
            while (listStreams.hasNext()) {
                Stream next = listStreams.next();
                if (next.getStreamName().startsWith(NameUtils.READER_GROUP_STREAM_PREFIX)) {
                    arrayList.add(next.getStreamName().substring(NameUtils.READER_GROUP_STREAM_PREFIX.length()));
                }
                try {
                    Futures.getThrowingException(Futures.exceptionallyExpecting(this.controller.sealStream(next.getScope(), next.getStreamName()), th -> {
                        Throwable unwrap = Exceptions.unwrap(th);
                        return (unwrap instanceof InvalidStreamException) || (unwrap instanceof ControllerFailureException);
                    }, false).thenCompose(bool -> {
                        return this.controller.deleteStream(next.getScope(), next.getStreamName());
                    }));
                } catch (Exception e) {
                    throw new DeleteScopeFailedException(String.format("Failed to seal and delete stream %s", next.getStreamName()), e);
                }
            }
            Iterator<KeyValueTableInfo> asIterator = this.controller.listKeyValueTables(str).asIterator();
            while (asIterator.hasNext()) {
                KeyValueTableInfo next2 = asIterator.next();
                try {
                    Futures.getThrowingException(this.controller.deleteKeyValueTable(str, next2.getKeyValueTableName()));
                } catch (Exception e2) {
                    throw new DeleteScopeFailedException(String.format("Failed to delete key-value table %s", next2.getKeyValueTableName()), e2);
                }
            }
            for (String str2 : arrayList) {
                try {
                    Futures.getThrowingException(this.controller.getReaderGroupConfig(str, str2).thenCompose(readerGroupConfig -> {
                        return this.controller.deleteReaderGroup(str, str2, readerGroupConfig.getReaderGroupId());
                    }));
                } catch (Exception e3) {
                    if (!(Exceptions.unwrap(e3) instanceof ReaderGroupNotFoundException)) {
                        throw new DeleteScopeFailedException(String.format("Failed to delete reader group %s", str2), e3);
                    }
                }
            }
        }
        return ((Boolean) Futures.getThrowingException(this.controller.deleteScope(str))).booleanValue();
    }

    @Override // io.pravega.client.admin.StreamManager
    public StreamInfo getStreamInfo(String str, String str2) {
        NameUtils.validateUserStreamName(str2);
        NameUtils.validateUserScopeName(str);
        log.info("Fetching StreamInfo for scope/stream: {}/{}", str, str2);
        return (StreamInfo) Futures.getThrowingException(getStreamInfo(Stream.of(str, str2)));
    }

    private CompletableFuture<StreamInfo> getStreamInfo(Stream stream) {
        CompletableFuture<StreamConfiguration> streamConfiguration = this.controller.getStreamConfiguration(stream.getScope(), stream.getStreamName());
        CompletableFuture<StreamCut> fetchTailStreamCut = this.streamCutHelper.fetchTailStreamCut(stream);
        CompletableFuture<StreamCut> fetchHeadStreamCut = this.streamCutHelper.fetchHeadStreamCut(stream);
        return CompletableFuture.allOf(streamConfiguration, fetchHeadStreamCut, fetchTailStreamCut).thenApply(r13 -> {
            return new StreamInfo(stream.getScope(), stream.getStreamName(), (StreamConfiguration) streamConfiguration.join(), (StreamCut) fetchTailStreamCut.join(), (StreamCut) fetchHeadStreamCut.join(), ((StreamCutImpl) fetchTailStreamCut.join()).getPositions().isEmpty());
        });
    }

    @Override // io.pravega.client.admin.StreamManager, java.lang.AutoCloseable
    public void close() {
        if (this.controller != null) {
            Controller controller = this.controller;
            Objects.requireNonNull(controller);
            Callbacks.invokeSafely(controller::close, th -> {
                log.error("Unable to close Controller client.", th);
            });
        }
        if (this.connectionPool != null) {
            this.connectionPool.close();
        }
    }
}
