package io.pravega.controller.server.rest.resources;

import io.pravega.auth.AuthException;
import io.pravega.auth.AuthHandler;
import io.pravega.client.ClientConfig;
import io.pravega.client.admin.impl.ReaderGroupManagerImpl;
import io.pravega.client.connection.impl.ConnectionFactory;
import io.pravega.client.stream.ReaderGroup;
import io.pravega.client.stream.ReaderGroupNotFoundException;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.impl.ClientFactoryImpl;
import io.pravega.common.LoggerHelpers;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.tracing.TagLogger;
import io.pravega.common.util.AsyncIterator;
import io.pravega.controller.server.ControllerService;
import io.pravega.controller.server.eventProcessor.LocalController;
import io.pravega.controller.server.rest.ModelHelper;
import io.pravega.controller.server.rest.generated.model.CreateScopeRequest;
import io.pravega.controller.server.rest.generated.model.CreateStreamRequest;
import io.pravega.controller.server.rest.generated.model.ReaderGroupProperty;
import io.pravega.controller.server.rest.generated.model.ReaderGroupsList;
import io.pravega.controller.server.rest.generated.model.ReaderGroupsListReaderGroups;
import io.pravega.controller.server.rest.generated.model.ScopeProperty;
import io.pravega.controller.server.rest.generated.model.ScopesList;
import io.pravega.controller.server.rest.generated.model.StreamState;
import io.pravega.controller.server.rest.generated.model.StreamsList;
import io.pravega.controller.server.rest.generated.model.UpdateStreamRequest;
import io.pravega.controller.server.rest.v1.ApiV1;
import io.pravega.controller.store.stream.ScaleMetadata;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.stream.api.grpc.v1.Controller;
import io.pravega.shared.NameUtils;
import io.pravega.shared.rest.security.AuthHandlerManager;
import io.pravega.shared.rest.security.RESTAuthHelper;
import io.pravega.shared.security.auth.AuthorizationResource;
import io.pravega.shared.security.auth.AuthorizationResourceImpl;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/rest/resources/StreamMetadataResourceImpl.class */
public class StreamMetadataResourceImpl implements ApiV1.ScopesApi {
    private static final TagLogger log = new TagLogger(LoggerFactory.getLogger(StreamMetadataResourceImpl.class));

    @Context
    HttpHeaders headers;
    private final ControllerService controllerService;
    private final RESTAuthHelper restAuthHelper;
    private final LocalController localController;
    private final ConnectionFactory connectionFactory;
    private final AuthorizationResource authorizationResource = new AuthorizationResourceImpl();
    private final Random requestIdGenerator = new Random();
    private final ClientConfig clientConfig;

    public StreamMetadataResourceImpl(LocalController localController, ControllerService controllerService, AuthHandlerManager authHandlerManager, ConnectionFactory connectionFactory, ClientConfig clientConfig) {
        this.localController = localController;
        this.controllerService = controllerService;
        this.restAuthHelper = new RESTAuthHelper(authHandlerManager);
        this.connectionFactory = connectionFactory;
        this.clientConfig = clientConfig;
    }

    @Override // io.pravega.controller.server.rest.v1.ApiV1.ScopesApi
    public void createScope(CreateScopeRequest createScopeRequest, SecurityContext securityContext, AsyncResponse asyncResponse) {
        long nextLong = this.requestIdGenerator.nextLong();
        long traceEnter = LoggerHelpers.traceEnter(log, "createScope", new Object[0]);
        try {
            NameUtils.validateUserScopeName(createScopeRequest.getScopeName());
            try {
                this.restAuthHelper.authenticateAuthorize(getAuthorizationHeader(), this.authorizationResource.ofScopes(), AuthHandler.Permissions.READ_UPDATE);
                CompletableFuture exceptionally = this.controllerService.createScope(createScopeRequest.getScopeName(), nextLong).thenApply(createScopeStatus -> {
                    if (createScopeStatus.getStatus() == Controller.CreateScopeStatus.Status.SUCCESS) {
                        log.info(nextLong, "Successfully created new scope: {}", new Object[]{createScopeRequest.getScopeName()});
                        return Response.status(Response.Status.CREATED).entity(new ScopeProperty().scopeName(createScopeRequest.getScopeName())).build();
                    }
                    if (createScopeStatus.getStatus() == Controller.CreateScopeStatus.Status.SCOPE_EXISTS) {
                        log.warn(nextLong, "Scope name: {} already exists", new Object[]{createScopeRequest.getScopeName()});
                        return Response.status(Response.Status.CONFLICT).build();
                    }
                    log.warn(nextLong, "Failed to create scope: {}", new Object[]{createScopeRequest.getScopeName()});
                    return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
                }).exceptionally((Function<Throwable, ? extends U>) th -> {
                    log.warn(nextLong, "createScope for scope: {} failed, exception: {}", new Object[]{createScopeRequest.getScopeName(), th});
                    return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
                });
                Objects.requireNonNull(asyncResponse);
                exceptionally.thenApply((v1) -> {
                    return r1.resume(v1);
                }).thenAccept(bool -> {
                    LoggerHelpers.traceLeave(log, "createScope", traceEnter, new Object[0]);
                });
            } catch (AuthException e) {
                log.warn(nextLong, "Create scope for {} failed due to authentication failure {}.", new Object[]{createScopeRequest.getScopeName(), e});
                asyncResponse.resume(Response.status(Response.Status.fromStatusCode(e.getResponseCode())).build());
                LoggerHelpers.traceLeave(log, "createScope", traceEnter, new Object[0]);
            }
        } catch (IllegalArgumentException | NullPointerException e2) {
            log.warn(nextLong, "Create scope failed due to invalid scope name {}", new Object[]{createScopeRequest.getScopeName()});
            asyncResponse.resume(Response.status(Response.Status.BAD_REQUEST).build());
            LoggerHelpers.traceLeave(log, "createScope", traceEnter, new Object[0]);
        }
    }

    private List<String> getAuthorizationHeader() {
        return this.headers.getRequestHeader("Authorization");
    }

    @Override // io.pravega.controller.server.rest.v1.ApiV1.ScopesApi
    public void createStream(String str, CreateStreamRequest createStreamRequest, SecurityContext securityContext, AsyncResponse asyncResponse) {
        long traceEnter = LoggerHelpers.traceEnter(log, "createStream", new Object[0]);
        long nextLong = this.requestIdGenerator.nextLong();
        String streamName = createStreamRequest.getStreamName();
        try {
            NameUtils.validateUserStreamName(streamName);
            try {
                this.restAuthHelper.authenticateAuthorize(getAuthorizationHeader(), this.authorizationResource.ofStreamsInScope(str), AuthHandler.Permissions.READ_UPDATE);
                StreamConfiguration createStreamConfig = ModelHelper.getCreateStreamConfig(createStreamRequest);
                CompletableFuture exceptionally = this.controllerService.createStream(str, streamName, createStreamConfig, System.currentTimeMillis(), nextLong).thenApply(createStreamStatus -> {
                    Response build;
                    if (createStreamStatus.getStatus() == Controller.CreateStreamStatus.Status.SUCCESS) {
                        log.info(nextLong, "Successfully created stream: {}/{}", new Object[]{str, streamName});
                        build = Response.status(Response.Status.CREATED).entity(ModelHelper.encodeStreamResponse(str, streamName, createStreamConfig)).build();
                    } else if (createStreamStatus.getStatus() == Controller.CreateStreamStatus.Status.STREAM_EXISTS) {
                        log.warn(nextLong, "Stream already exists: {}/{}", new Object[]{str, streamName});
                        build = Response.status(Response.Status.CONFLICT).build();
                    } else if (createStreamStatus.getStatus() == Controller.CreateStreamStatus.Status.SCOPE_NOT_FOUND) {
                        log.warn(nextLong, "Scope not found: {}", new Object[]{str});
                        build = Response.status(Response.Status.NOT_FOUND).build();
                    } else if (createStreamStatus.getStatus() == Controller.CreateStreamStatus.Status.INVALID_STREAM_NAME) {
                        log.warn(nextLong, "Invalid stream name: {}", new Object[]{streamName});
                        build = Response.status(Response.Status.BAD_REQUEST).build();
                    } else {
                        log.warn(nextLong, "createStream failed for : {}/{}", new Object[]{str, streamName});
                        build = Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
                    }
                    return build;
                }).exceptionally((Function<Throwable, ? extends U>) th -> {
                    log.warn(nextLong, "createStream for {}/{} failed: ", new Object[]{str, streamName, th});
                    return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
                });
                Objects.requireNonNull(asyncResponse);
                exceptionally.thenApply((v1) -> {
                    return r1.resume(v1);
                }).thenAccept(bool -> {
                    LoggerHelpers.traceLeave(log, "createStream", traceEnter, new Object[0]);
                });
            } catch (AuthException e) {
                log.warn(nextLong, "Create stream for {} failed due to authentication failure.", new Object[]{streamName});
                asyncResponse.resume(Response.status(Response.Status.fromStatusCode(e.getResponseCode())).build());
                LoggerHelpers.traceLeave(log, "createStream", traceEnter, new Object[0]);
            }
        } catch (IllegalArgumentException | NullPointerException e2) {
            log.warn(nextLong, "Create stream failed due to invalid stream name {}", new Object[]{streamName});
            asyncResponse.resume(Response.status(Response.Status.BAD_REQUEST).build());
            LoggerHelpers.traceLeave(log, "createStream", traceEnter, new Object[0]);
        }
    }

    @Override // io.pravega.controller.server.rest.v1.ApiV1.ScopesApi
    public void deleteScope(String str, SecurityContext securityContext, AsyncResponse asyncResponse) {
        long traceEnter = LoggerHelpers.traceEnter(log, "deleteScope", new Object[0]);
        long nextLong = this.requestIdGenerator.nextLong();
        try {
            this.restAuthHelper.authenticateAuthorize(getAuthorizationHeader(), this.authorizationResource.ofScopes(), AuthHandler.Permissions.READ_UPDATE);
            CompletableFuture exceptionally = this.controllerService.deleteScope(str, nextLong).thenApply(deleteScopeStatus -> {
                if (deleteScopeStatus.getStatus() == Controller.DeleteScopeStatus.Status.SUCCESS) {
                    log.info(nextLong, "Successfully deleted scope: {}", new Object[]{str});
                    return Response.status(Response.Status.NO_CONTENT).build();
                }
                if (deleteScopeStatus.getStatus() == Controller.DeleteScopeStatus.Status.SCOPE_NOT_FOUND) {
                    log.warn(nextLong, "Scope: {} not found", new Object[]{str});
                    return Response.status(Response.Status.NOT_FOUND).build();
                }
                if (deleteScopeStatus.getStatus() == Controller.DeleteScopeStatus.Status.SCOPE_NOT_EMPTY) {
                    log.warn(nextLong, "Cannot delete scope: {} with non-empty streams", new Object[]{str});
                    return Response.status(Response.Status.PRECONDITION_FAILED).build();
                }
                log.warn(nextLong, "deleteScope for {} failed", new Object[]{str});
                return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                log.warn(nextLong, "deleteScope for {} failed with exception: {}", new Object[]{str, th});
                return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
            });
            Objects.requireNonNull(asyncResponse);
            exceptionally.thenApply((v1) -> {
                return r1.resume(v1);
            }).thenAccept(bool -> {
                LoggerHelpers.traceLeave(log, "deleteScope", traceEnter, new Object[0]);
            });
        } catch (AuthException e) {
            log.warn(nextLong, "Delete scope for {} failed due to authentication failure.", new Object[]{str});
            asyncResponse.resume(Response.status(Response.Status.fromStatusCode(e.getResponseCode())).build());
            LoggerHelpers.traceLeave(log, "createStream", traceEnter, new Object[0]);
        }
    }

    @Override // io.pravega.controller.server.rest.v1.ApiV1.ScopesApi
    public void deleteStream(String str, String str2, SecurityContext securityContext, AsyncResponse asyncResponse) {
        long traceEnter = LoggerHelpers.traceEnter(log, "deleteStream", new Object[0]);
        long nextLong = this.requestIdGenerator.nextLong();
        try {
            this.restAuthHelper.authenticateAuthorize(getAuthorizationHeader(), this.authorizationResource.ofStreamInScope(str, str2), AuthHandler.Permissions.READ_UPDATE);
            CompletableFuture exceptionally = this.controllerService.deleteStream(str, str2, nextLong).thenApply(deleteStreamStatus -> {
                if (deleteStreamStatus.getStatus() == Controller.DeleteStreamStatus.Status.SUCCESS) {
                    log.info(nextLong, "Successfully deleted stream: {}", new Object[]{str2});
                    return Response.status(Response.Status.NO_CONTENT).build();
                }
                if (deleteStreamStatus.getStatus() == Controller.DeleteStreamStatus.Status.STREAM_NOT_FOUND) {
                    log.warn(nextLong, "Scope: {}, Stream {} not found", new Object[]{str, str2});
                    return Response.status(Response.Status.NOT_FOUND).build();
                }
                if (deleteStreamStatus.getStatus() == Controller.DeleteStreamStatus.Status.STREAM_NOT_SEALED) {
                    log.warn(nextLong, "Cannot delete unsealed stream: {}", new Object[]{str2});
                    return Response.status(Response.Status.PRECONDITION_FAILED).build();
                }
                log.warn(nextLong, "deleteStream for {} failed", new Object[]{str2});
                return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                log.warn(nextLong, "deleteStream for {} failed with exception: {}", new Object[]{str2, th});
                return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
            });
            Objects.requireNonNull(asyncResponse);
            exceptionally.thenApply((v1) -> {
                return r1.resume(v1);
            }).thenAccept(bool -> {
                LoggerHelpers.traceLeave(log, "deleteStream", traceEnter, new Object[0]);
            });
        } catch (AuthException e) {
            log.warn(nextLong, "Delete stream for {} failed due to authentication failure.", new Object[]{str2});
            asyncResponse.resume(Response.status(Response.Status.fromStatusCode(e.getResponseCode())).build());
            LoggerHelpers.traceLeave(log, "deleteStream", traceEnter, new Object[0]);
        }
    }

    @Override // io.pravega.controller.server.rest.v1.ApiV1.ScopesApi
    public void getReaderGroup(String str, String str2, SecurityContext securityContext, AsyncResponse asyncResponse) {
        long traceEnter = LoggerHelpers.traceEnter(log, "getReaderGroup", new Object[0]);
        long nextLong = this.requestIdGenerator.nextLong();
        try {
            this.restAuthHelper.authenticateAuthorize(getAuthorizationHeader(), this.authorizationResource.ofReaderGroupInScope(str, str2), AuthHandler.Permissions.READ);
            ClientFactoryImpl clientFactoryImpl = new ClientFactoryImpl(str, this.localController, this.clientConfig);
            ReaderGroupManagerImpl readerGroupManagerImpl = new ReaderGroupManagerImpl(str, this.localController, clientFactoryImpl);
            ReaderGroupProperty readerGroupProperty = new ReaderGroupProperty();
            readerGroupProperty.setScopeName(str);
            readerGroupProperty.setReaderGroupName(str2);
            CompletableFuture.supplyAsync(() -> {
                ReaderGroup readerGroup = readerGroupManagerImpl.getReaderGroup(str2);
                readerGroupProperty.setOnlineReaderIds(new ArrayList(readerGroup.getOnlineReaders()));
                readerGroupProperty.setStreamList(new ArrayList(readerGroup.getStreamNames()));
                return Response.status(Response.Status.OK).entity(readerGroupProperty).build();
            }, this.controllerService.getExecutor()).exceptionally(th -> {
                log.warn(nextLong, "getReaderGroup for {} failed with exception: ", new Object[]{str2, th});
                return th.getCause() instanceof ReaderGroupNotFoundException ? Response.status(Response.Status.NOT_FOUND).build() : Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
            }).thenAccept(response -> {
                asyncResponse.resume(response);
                readerGroupManagerImpl.close();
                clientFactoryImpl.close();
                LoggerHelpers.traceLeave(log, "getReaderGroup", traceEnter, new Object[0]);
            });
        } catch (AuthException e) {
            log.warn(nextLong, "Get reader group for {} failed due to authentication failure.", new Object[]{str + "/" + str2});
            asyncResponse.resume(Response.status(Response.Status.fromStatusCode(e.getResponseCode())).build());
            LoggerHelpers.traceLeave(log, "getReaderGroup", traceEnter, new Object[0]);
        }
    }

    @Override // io.pravega.controller.server.rest.v1.ApiV1.ScopesApi
    public void getScope(String str, SecurityContext securityContext, AsyncResponse asyncResponse) {
        long traceEnter = LoggerHelpers.traceEnter(log, "getScope", new Object[0]);
        long nextLong = this.requestIdGenerator.nextLong();
        try {
            this.restAuthHelper.authenticateAuthorize(getAuthorizationHeader(), this.authorizationResource.ofScope(str), AuthHandler.Permissions.READ);
            CompletableFuture exceptionally = this.controllerService.getScope(str, nextLong).thenApply(str2 -> {
                return Response.status(Response.Status.OK).entity(new ScopeProperty().scopeName(str2)).build();
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                if (th.getCause() instanceof StoreException.DataNotFoundException) {
                    log.warn(nextLong, "Scope: {} not found", new Object[]{str});
                    return Response.status(Response.Status.NOT_FOUND).build();
                }
                log.warn(nextLong, "getScope for {} failed with exception: {}", new Object[]{str, th});
                return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
            });
            Objects.requireNonNull(asyncResponse);
            exceptionally.thenApply((v1) -> {
                return r1.resume(v1);
            }).thenAccept(bool -> {
                LoggerHelpers.traceLeave(log, "getScope", traceEnter, new Object[0]);
            });
        } catch (AuthException e) {
            log.warn(nextLong, "Get scope for {} failed due to authentication failure.", new Object[]{str});
            asyncResponse.resume(Response.status(Response.Status.fromStatusCode(e.getResponseCode())).build());
            LoggerHelpers.traceLeave(log, "getScope", traceEnter, new Object[0]);
        }
    }

    @Override // io.pravega.controller.server.rest.v1.ApiV1.ScopesApi
    public void getStream(String str, String str2, SecurityContext securityContext, AsyncResponse asyncResponse) {
        long traceEnter = LoggerHelpers.traceEnter(log, "getStream", new Object[0]);
        long nextLong = this.requestIdGenerator.nextLong();
        try {
            this.restAuthHelper.authenticateAuthorize(getAuthorizationHeader(), this.authorizationResource.ofStreamInScope(str, str2), AuthHandler.Permissions.READ);
            CompletableFuture exceptionally = this.controllerService.getStream(str, str2, nextLong).thenApply(streamConfiguration -> {
                return Response.status(Response.Status.OK).entity(ModelHelper.encodeStreamResponse(str, str2, streamConfiguration)).build();
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                if ((th.getCause() instanceof StoreException.DataNotFoundException) || (th instanceof StoreException.DataNotFoundException)) {
                    log.warn(nextLong, "Stream: {}/{} not found", new Object[]{str, str2});
                    return Response.status(Response.Status.NOT_FOUND).build();
                }
                log.warn(nextLong, "getStream for {}/{} failed with exception: {}", new Object[]{str, str2, th});
                return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
            });
            Objects.requireNonNull(asyncResponse);
            exceptionally.thenApply((v1) -> {
                return r1.resume(v1);
            }).thenAccept(bool -> {
                LoggerHelpers.traceLeave(log, "getStream", traceEnter, new Object[0]);
            });
        } catch (AuthException e) {
            log.warn(nextLong, "Get stream for {} failed due to authentication failure.", new Object[]{str + "/" + str2});
            asyncResponse.resume(Response.status(Response.Status.fromStatusCode(e.getResponseCode())).build());
            LoggerHelpers.traceLeave(log, "getStream", traceEnter, new Object[0]);
        }
    }

    @Override // io.pravega.controller.server.rest.v1.ApiV1.ScopesApi
    public void listReaderGroups(String str, SecurityContext securityContext, AsyncResponse asyncResponse) {
        long traceEnter = LoggerHelpers.traceEnter(log, "listReaderGroups", new Object[0]);
        long nextLong = this.requestIdGenerator.nextLong();
        try {
            this.restAuthHelper.authenticateAuthorize(getAuthorizationHeader(), this.authorizationResource.ofReaderGroupsInScope(str), AuthHandler.Permissions.READ);
            CompletableFuture exceptionally = this.controllerService.listStreamsInScope(str, nextLong).thenApply(map -> {
                ReaderGroupsList readerGroupsList = new ReaderGroupsList();
                map.forEach((str2, streamConfiguration) -> {
                    if (str2.startsWith("_RG")) {
                        ReaderGroupsListReaderGroups readerGroupsListReaderGroups = new ReaderGroupsListReaderGroups();
                        readerGroupsListReaderGroups.setReaderGroupName(str2.substring("_RG".length()));
                        readerGroupsList.addReaderGroupsItem(readerGroupsListReaderGroups);
                    }
                });
                log.info(nextLong, "Successfully fetched readerGroups for scope: {}", new Object[]{str});
                return Response.status(Response.Status.OK).entity(readerGroupsList).build();
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                if ((th.getCause() instanceof StoreException.DataNotFoundException) || (th instanceof StoreException.DataNotFoundException)) {
                    log.warn(nextLong, "Scope name: {} not found", new Object[]{str});
                    return Response.status(Response.Status.NOT_FOUND).build();
                }
                log.warn(nextLong, "listReaderGroups for {} failed with exception: ", new Object[]{str, th});
                return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
            });
            Objects.requireNonNull(asyncResponse);
            exceptionally.thenApply((v1) -> {
                return r1.resume(v1);
            }).thenAccept(bool -> {
                LoggerHelpers.traceLeave(log, "listReaderGroups", traceEnter, new Object[0]);
            });
        } catch (AuthException e) {
            log.warn(nextLong, "Get reader groups for {} failed due to authentication failure.", new Object[]{str});
            asyncResponse.resume(Response.status(Response.Status.fromStatusCode(e.getResponseCode())).build());
            LoggerHelpers.traceLeave(log, "listReaderGroups", traceEnter, new Object[0]);
        }
    }

    @Override // io.pravega.controller.server.rest.v1.ApiV1.ScopesApi
    public void listScopes(SecurityContext securityContext, AsyncResponse asyncResponse) {
        long traceEnter = LoggerHelpers.traceEnter(log, "listScopes", new Object[0]);
        long nextLong = this.requestIdGenerator.nextLong();
        List<String> authorizationHeader = getAuthorizationHeader();
        try {
            Principal authenticate = this.restAuthHelper.authenticate(authorizationHeader);
            this.restAuthHelper.authorize(authorizationHeader, this.authorizationResource.ofScopes(), authenticate, AuthHandler.Permissions.READ);
            this.controllerService.listScopes(nextLong).thenApply(list -> {
                ScopesList scopesList = new ScopesList();
                list.forEach(str -> {
                    try {
                        if (this.restAuthHelper.isAuthorized(authorizationHeader, this.authorizationResource.ofScope(str), authenticate, AuthHandler.Permissions.READ)) {
                            scopesList.addScopesItem(new ScopeProperty().scopeName(str));
                        }
                    } catch (AuthException e) {
                        log.warn(nextLong, e.getMessage(), new Object[]{e});
                    }
                });
                return Response.status(Response.Status.OK).entity(scopesList).build();
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                log.warn(nextLong, "listScopes failed with exception: ", new Object[]{th});
                return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
            }).thenApply(response -> {
                asyncResponse.resume(response);
                LoggerHelpers.traceLeave(log, "listScopes", traceEnter, new Object[0]);
                return response;
            });
        } catch (AuthException e) {
            log.warn(nextLong, "Get scopes failed due to authentication failure.", new Object[]{e});
            asyncResponse.resume(Response.status(Response.Status.fromStatusCode(e.getResponseCode())).build());
            LoggerHelpers.traceLeave(log, "listScopes", traceEnter, new Object[0]);
        }
    }

    @Override // io.pravega.controller.server.rest.v1.ApiV1.ScopesApi
    public void listStreams(String str, String str2, String str3, SecurityContext securityContext, AsyncResponse asyncResponse) {
        long traceEnter = LoggerHelpers.traceEnter(log, "listStreams", new Object[0]);
        long nextLong = this.requestIdGenerator.nextLong();
        List<String> authorizationHeader = getAuthorizationHeader();
        try {
            Principal authenticate = this.restAuthHelper.authenticate(authorizationHeader);
            this.restAuthHelper.authorize(authorizationHeader, this.authorizationResource.ofStreamsInScope(str), authenticate, AuthHandler.Permissions.READ);
            boolean z = str2 != null && str2.equals("showInternalStreams");
            if (!(str2 != null && str2.equals("tag")) || str3 == null) {
                CompletableFuture exceptionally = this.controllerService.listStreamsInScope(str, nextLong).thenApply(map -> {
                    StreamsList streamsList = new StreamsList();
                    streamsList.setStreams(new ArrayList());
                    map.forEach((str4, streamConfiguration) -> {
                        try {
                            if (this.restAuthHelper.isAuthorized(authorizationHeader, this.authorizationResource.ofStreamInScope(str, str4), authenticate, AuthHandler.Permissions.READ)) {
                                if ((!z) ^ str4.startsWith("_")) {
                                    streamsList.addStreamsItem(ModelHelper.encodeStreamResponse(str, str4, streamConfiguration));
                                }
                            }
                        } catch (AuthException e) {
                            log.warn(nextLong, "Read internal streams for scope {} failed due to authentication failure.", new Object[]{str});
                        }
                    });
                    log.info(nextLong, "Successfully fetched streams for scope: {}", new Object[]{str});
                    return Response.status(Response.Status.OK).entity(streamsList).build();
                }).exceptionally((Function<Throwable, ? extends U>) th -> {
                    if ((th.getCause() instanceof StoreException.DataNotFoundException) || (th instanceof StoreException.DataNotFoundException)) {
                        log.warn(nextLong, "Scope name: {} not found", new Object[]{str});
                        return Response.status(Response.Status.NOT_FOUND).build();
                    }
                    log.warn(nextLong, "listStreams for {} failed with exception: {}", new Object[]{str, th});
                    return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
                });
                Objects.requireNonNull(asyncResponse);
                exceptionally.thenApply((v1) -> {
                    return r1.resume(v1);
                }).thenAccept(bool -> {
                    LoggerHelpers.traceLeave(log, "listStreams", traceEnter, new Object[0]);
                });
            } else {
                ArrayList arrayList = new ArrayList();
                AsyncIterator<Stream> listStreamsForTag = this.localController.listStreamsForTag(str, str3);
                Objects.requireNonNull(arrayList);
                CompletableFuture exceptionally2 = listStreamsForTag.collectRemaining((v1) -> {
                    return r1.add(v1);
                }).thenCompose(r17 -> {
                    return Futures.allOfWithResults((List) arrayList.stream().filter(stream -> {
                        boolean z2 = false;
                        try {
                            z2 = this.restAuthHelper.isAuthorized(authorizationHeader, this.authorizationResource.ofStreamInScope(str, stream.getStreamName()), authenticate, AuthHandler.Permissions.READ);
                        } catch (AuthException e) {
                            log.warn(nextLong, "List Streams with tag {} for scope {} failed due to authentication failure.", new Object[]{str3, str});
                        }
                        return z2;
                    }).map(stream2 -> {
                        return this.localController.getStreamConfiguration(str, stream2.getStreamName()).thenApply(streamConfiguration -> {
                            return new ImmutablePair(stream2, streamConfiguration);
                        });
                    }).collect(Collectors.toList()));
                }).thenApply(list -> {
                    StreamsList streamsList = new StreamsList();
                    streamsList.setStreams(new ArrayList());
                    list.forEach(immutablePair -> {
                        streamsList.addStreamsItem(ModelHelper.encodeStreamResponse(((Stream) immutablePair.left).getScope(), ((Stream) immutablePair.left).getStreamName(), (StreamConfiguration) immutablePair.right));
                    });
                    log.info(nextLong, "Successfully fetched streams for scope: {} with tag: {}", new Object[]{str, str3});
                    return Response.status(Response.Status.OK).entity(streamsList).build();
                }).exceptionally(th2 -> {
                    if ((th2.getCause() instanceof StoreException.DataNotFoundException) || (th2 instanceof StoreException.DataNotFoundException)) {
                        log.warn(nextLong, "Scope name: {} not found", new Object[]{str});
                        return Response.status(Response.Status.NOT_FOUND).build();
                    }
                    log.warn(nextLong, "listStreams for {} with tag {} failed with exception: {}", new Object[]{str, str3, th2});
                    return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
                });
                Objects.requireNonNull(asyncResponse);
                exceptionally2.thenApply((v1) -> {
                    return r1.resume(v1);
                }).thenAccept(bool2 -> {
                    LoggerHelpers.traceLeave(log, "listStreams", traceEnter, new Object[0]);
                });
            }
        } catch (AuthException e) {
            log.warn(nextLong, "List streams for {} failed due to authentication failure.", new Object[]{str});
            asyncResponse.resume(Response.status(Response.Status.fromStatusCode(e.getResponseCode())).build());
            LoggerHelpers.traceLeave(log, "listStreams", traceEnter, new Object[0]);
        }
    }

    @Override // io.pravega.controller.server.rest.v1.ApiV1.ScopesApi
    public void updateStream(String str, String str2, UpdateStreamRequest updateStreamRequest, SecurityContext securityContext, AsyncResponse asyncResponse) {
        long traceEnter = LoggerHelpers.traceEnter(log, "updateStream", new Object[0]);
        long nextLong = this.requestIdGenerator.nextLong();
        try {
            this.restAuthHelper.authenticateAuthorize(getAuthorizationHeader(), this.authorizationResource.ofStreamInScope(str, str2), AuthHandler.Permissions.READ_UPDATE);
            StreamConfiguration updateStreamConfig = ModelHelper.getUpdateStreamConfig(updateStreamRequest);
            CompletableFuture exceptionally = this.controllerService.updateStream(str, str2, updateStreamConfig, nextLong).thenApply(updateStreamStatus -> {
                if (updateStreamStatus.getStatus() == Controller.UpdateStreamStatus.Status.SUCCESS) {
                    log.info(nextLong, "Successfully updated stream config for: {}/{}", new Object[]{str, str2});
                    return Response.status(Response.Status.OK).entity(ModelHelper.encodeStreamResponse(str, str2, updateStreamConfig)).build();
                }
                if (updateStreamStatus.getStatus() == Controller.UpdateStreamStatus.Status.STREAM_NOT_FOUND || updateStreamStatus.getStatus() == Controller.UpdateStreamStatus.Status.SCOPE_NOT_FOUND) {
                    log.warn(nextLong, "Stream: {}/{} not found", new Object[]{str, str2});
                    return Response.status(Response.Status.NOT_FOUND).build();
                }
                log.warn(nextLong, "updateStream failed for {}/{}", new Object[]{str, str2});
                return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                log.warn(nextLong, "updateStream for {}/{} failed with exception: {}", new Object[]{str, str2, th});
                return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
            });
            Objects.requireNonNull(asyncResponse);
            exceptionally.thenApply((v1) -> {
                return r1.resume(v1);
            }).thenAccept(bool -> {
                LoggerHelpers.traceLeave(log, "updateStream", traceEnter, new Object[0]);
            });
        } catch (AuthException e) {
            log.warn(nextLong, "Update stream for {} failed due to authentication failure.", new Object[]{str + "/" + str2});
            asyncResponse.resume(Response.status(Response.Status.fromStatusCode(e.getResponseCode())).build());
            LoggerHelpers.traceLeave(log, "Update stream", traceEnter, new Object[0]);
        }
    }

    @Override // io.pravega.controller.server.rest.v1.ApiV1.ScopesApi
    public void updateStreamState(String str, String str2, StreamState streamState, SecurityContext securityContext, AsyncResponse asyncResponse) {
        long traceEnter = LoggerHelpers.traceEnter(log, "updateStreamState", new Object[0]);
        long nextLong = this.requestIdGenerator.nextLong();
        try {
            this.restAuthHelper.authenticateAuthorize(getAuthorizationHeader(), this.authorizationResource.ofStreamInScope(str, str2), AuthHandler.Permissions.READ_UPDATE);
            if (streamState.getStreamState() != StreamState.StreamStateEnum.SEALED) {
                log.warn(nextLong, "Received invalid stream state: {} from client for stream {}/{}", new Object[]{streamState.getStreamState(), str, str2});
                asyncResponse.resume(Response.status(Response.Status.BAD_REQUEST).build());
            } else {
                CompletableFuture exceptionally = this.controllerService.sealStream(str, str2, nextLong).thenApply(updateStreamStatus -> {
                    if (updateStreamStatus.getStatus() == Controller.UpdateStreamStatus.Status.SUCCESS) {
                        log.info(nextLong, "Successfully sealed stream: {}", new Object[]{str2});
                        return Response.status(Response.Status.OK).entity(streamState).build();
                    }
                    if (updateStreamStatus.getStatus() == Controller.UpdateStreamStatus.Status.SCOPE_NOT_FOUND || updateStreamStatus.getStatus() == Controller.UpdateStreamStatus.Status.STREAM_NOT_FOUND) {
                        log.warn(nextLong, "Scope: {} or Stream {} not found", new Object[]{str, str2});
                        return Response.status(Response.Status.NOT_FOUND).build();
                    }
                    log.warn(nextLong, "updateStreamState for {} failed", new Object[]{str2});
                    return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
                }).exceptionally((Function<Throwable, ? extends U>) th -> {
                    log.warn(nextLong, "updateStreamState for {} failed with exception: {}", new Object[]{str2, th});
                    return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
                });
                Objects.requireNonNull(asyncResponse);
                exceptionally.thenApply((v1) -> {
                    return r1.resume(v1);
                }).thenAccept(bool -> {
                    LoggerHelpers.traceLeave(log, "updateStreamState", traceEnter, new Object[0]);
                });
            }
        } catch (AuthException e) {
            log.warn(nextLong, "Update stream for {} failed due to authentication failure.", new Object[]{str + "/" + str2});
            asyncResponse.resume(Response.status(Response.Status.fromStatusCode(e.getResponseCode())).build());
            LoggerHelpers.traceLeave(log, "Update stream", traceEnter, new Object[0]);
        }
    }

    @Override // io.pravega.controller.server.rest.v1.ApiV1.ScopesApi
    public void getScalingEvents(String str, String str2, Long l, Long l2, SecurityContext securityContext, AsyncResponse asyncResponse) {
        long traceEnter = LoggerHelpers.traceEnter(log, "getScalingEvents", new Object[0]);
        long nextLong = this.requestIdGenerator.nextLong();
        if (l == null || l2 == null) {
            log.warn(nextLong, "Received an invalid request with missing query parameters for scopeName/streamName: {}/{}", new Object[]{str, str2});
            asyncResponse.resume(Response.status(Response.Status.BAD_REQUEST).build());
            LoggerHelpers.traceLeave(log, "getScalingEvents", traceEnter, new Object[0]);
            return;
        }
        try {
            this.restAuthHelper.authenticateAuthorize(getAuthorizationHeader(), this.authorizationResource.ofStreamInScope(str, str2), AuthHandler.Permissions.READ);
            if (l.longValue() < 0 || l2.longValue() < 0 || l.longValue() > l2.longValue()) {
                log.warn(nextLong, "Received invalid request from client for scopeName/streamName: {}/{} ", new Object[]{str, str2});
                asyncResponse.resume(Response.status(Response.Status.BAD_REQUEST).build());
                LoggerHelpers.traceLeave(log, "getScalingEvents", traceEnter, new Object[0]);
            } else {
                CompletableFuture exceptionally = this.controllerService.getScaleRecords(str, str2, l.longValue(), l2.longValue(), nextLong).thenApply(list -> {
                    Iterator it = list.iterator();
                    ArrayList arrayList = new ArrayList();
                    ScaleMetadata scaleMetadata = null;
                    while (it.hasNext()) {
                        ScaleMetadata scaleMetadata2 = (ScaleMetadata) it.next();
                        if (scaleMetadata2.getTimestamp() >= l.longValue() && scaleMetadata2.getTimestamp() <= l2.longValue()) {
                            arrayList.add(scaleMetadata2);
                        } else if (scaleMetadata2.getTimestamp() < l.longValue() && (scaleMetadata == null || scaleMetadata.getTimestamp() <= scaleMetadata2.getTimestamp())) {
                            scaleMetadata = scaleMetadata2;
                        }
                    }
                    if (scaleMetadata != null) {
                        arrayList.add(0, scaleMetadata);
                    }
                    log.info(nextLong, "Successfully fetched required scaling events for scope: {}, stream: {}", new Object[]{str, str2});
                    return Response.status(Response.Status.OK).entity(arrayList).build();
                }).exceptionally((Function<Throwable, ? extends U>) th -> {
                    if ((th.getCause() instanceof StoreException.DataNotFoundException) || (th instanceof StoreException.DataNotFoundException)) {
                        log.warn(nextLong, "Stream/Scope name: {}/{} not found", new Object[]{str, str2});
                        return Response.status(Response.Status.NOT_FOUND).build();
                    }
                    log.warn(nextLong, "getScalingEvents for scopeName/streamName: {}/{} failed with exception ", new Object[]{str, str2, th});
                    return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
                });
                Objects.requireNonNull(asyncResponse);
                exceptionally.thenApply((v1) -> {
                    return r1.resume(v1);
                }).thenAccept(bool -> {
                    LoggerHelpers.traceLeave(log, "getScalingEvents", traceEnter, new Object[0]);
                });
            }
        } catch (AuthException e) {
            log.warn(nextLong, "Get scaling events for {} failed due to authentication failure.", new Object[]{str + "/" + str2});
            asyncResponse.resume(Response.status(Response.Status.fromStatusCode(e.getResponseCode())).build());
            LoggerHelpers.traceLeave(log, "Get scaling events", traceEnter, new Object[0]);
        }
    }
}
