package org.apache.pulsar.functions.worker.rest.api;

import com.google.protobuf.ByteString;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationParameters;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.functions.UpdateOptionsImpl;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.ConfigFieldDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.functions.auth.FunctionAuthData;
import org.apache.pulsar.functions.auth.FunctionAuthUtils;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.ComponentTypeUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.rest.RestUtils;
import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
import org.apache.pulsar.functions.worker.service.api.Sinks;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/worker/rest/api/SinksImpl.class */
public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerService> {
    private static final Logger log = LoggerFactory.getLogger(SinksImpl.class);

    /* loaded from: input_file:org/apache/pulsar/functions/worker/rest/api/SinksImpl$GetSinkStatus.class */
    private class GetSinkStatus extends ComponentImpl.GetStatus<SinkStatus, SinkStatus.SinkInstanceStatus.SinkInstanceStatusData> {
        private GetSinkStatus() {
            super();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.functions.worker.rest.api.ComponentImpl.GetStatus
        public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData notScheduledInstance() {
            SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData = new SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
            sinkInstanceStatusData.setRunning(false);
            sinkInstanceStatusData.setError("Sink has not been scheduled");
            return sinkInstanceStatusData;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.functions.worker.rest.api.ComponentImpl.GetStatus
        public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData fromFunctionStatusProto(InstanceCommunication.FunctionStatus functionStatus, String str) {
            SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData = new SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
            sinkInstanceStatusData.setRunning(functionStatus.getRunning());
            sinkInstanceStatusData.setError(functionStatus.getFailureException());
            sinkInstanceStatusData.setNumRestarts(functionStatus.getNumRestarts());
            sinkInstanceStatusData.setNumReadFromPulsar(functionStatus.getNumReceived());
            sinkInstanceStatusData.setNumSystemExceptions(functionStatus.getNumSystemExceptions() + functionStatus.getNumUserExceptions() + functionStatus.getNumSourceExceptions());
            LinkedList linkedList = new LinkedList();
            Iterator it = functionStatus.getLatestUserExceptionsList().iterator();
            while (it.hasNext()) {
                linkedList.add(SinksImpl.this.getExceptionInformation((InstanceCommunication.FunctionStatus.ExceptionInformation) it.next()));
            }
            Iterator it2 = functionStatus.getLatestSystemExceptionsList().iterator();
            while (it2.hasNext()) {
                linkedList.add(SinksImpl.this.getExceptionInformation((InstanceCommunication.FunctionStatus.ExceptionInformation) it2.next()));
            }
            Iterator it3 = functionStatus.getLatestSourceExceptionsList().iterator();
            while (it3.hasNext()) {
                linkedList.add(SinksImpl.this.getExceptionInformation((InstanceCommunication.FunctionStatus.ExceptionInformation) it3.next()));
            }
            sinkInstanceStatusData.setLatestSystemExceptions(linkedList);
            sinkInstanceStatusData.setNumSinkExceptions(functionStatus.getNumSinkExceptions());
            LinkedList linkedList2 = new LinkedList();
            Iterator it4 = functionStatus.getLatestSinkExceptionsList().iterator();
            while (it4.hasNext()) {
                linkedList2.add(SinksImpl.this.getExceptionInformation((InstanceCommunication.FunctionStatus.ExceptionInformation) it4.next()));
            }
            sinkInstanceStatusData.setLatestSinkExceptions(linkedList2);
            sinkInstanceStatusData.setNumWrittenToSink(functionStatus.getNumSuccessfullyProcessed());
            sinkInstanceStatusData.setLastReceivedTime(functionStatus.getLastInvocationTime());
            sinkInstanceStatusData.setWorkerId(str);
            return sinkInstanceStatusData;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.functions.worker.rest.api.ComponentImpl.GetStatus
        public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData notRunning(String str, String str2) {
            SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData = new SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
            sinkInstanceStatusData.setRunning(false);
            if (str2 != null) {
                sinkInstanceStatusData.setError(str2);
            }
            sinkInstanceStatusData.setWorkerId(str);
            return sinkInstanceStatusData;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.functions.worker.rest.api.ComponentImpl.GetStatus
        public SinkStatus getStatus(String str, String str2, String str3, Collection<Function.Assignment> collection, URI uri) throws PulsarAdminException {
            SinkStatus sinkStatus = new SinkStatus();
            for (Function.Assignment assignment : collection) {
                SinkStatus.SinkInstanceStatus.SinkInstanceStatusData componentInstanceStatus = SinksImpl.this.worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId()) ? getComponentInstanceStatus(str, str2, str3, assignment.getInstance().getInstanceId(), null) : SinksImpl.this.worker().getFunctionAdmin().sinks().getSinkStatus(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), assignment.getInstance().getInstanceId());
                SinkStatus.SinkInstanceStatus sinkInstanceStatus = new SinkStatus.SinkInstanceStatus();
                sinkInstanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
                sinkInstanceStatus.setStatus(componentInstanceStatus);
                sinkStatus.addInstance(sinkInstanceStatus);
            }
            sinkStatus.setNumInstances(sinkStatus.instances.size());
            sinkStatus.getInstances().forEach(sinkInstanceStatus2 -> {
                if (sinkInstanceStatus2.getStatus().isRunning()) {
                    sinkStatus.numRunning++;
                }
            });
            return sinkStatus;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.functions.worker.rest.api.ComponentImpl.GetStatus
        public SinkStatus getStatusExternal(String str, String str2, String str3, int i) {
            SinkStatus sinkStatus = new SinkStatus();
            for (int i2 = 0; i2 < i; i2++) {
                SinkStatus.SinkInstanceStatus.SinkInstanceStatusData componentInstanceStatus = getComponentInstanceStatus(str, str2, str3, i2, null);
                SinkStatus.SinkInstanceStatus sinkInstanceStatus = new SinkStatus.SinkInstanceStatus();
                sinkInstanceStatus.setInstanceId(i2);
                sinkInstanceStatus.setStatus(componentInstanceStatus);
                sinkStatus.addInstance(sinkInstanceStatus);
            }
            sinkStatus.setNumInstances(sinkStatus.instances.size());
            sinkStatus.getInstances().forEach(sinkInstanceStatus2 -> {
                if (sinkInstanceStatus2.getStatus().isRunning()) {
                    sinkStatus.numRunning++;
                }
            });
            return sinkStatus;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.functions.worker.rest.api.ComponentImpl.GetStatus
        public SinkStatus emptyStatus(int i) {
            SinkStatus sinkStatus = new SinkStatus();
            sinkStatus.setNumInstances(i);
            sinkStatus.setNumRunning(0);
            for (int i2 = 0; i2 < i; i2++) {
                SinkStatus.SinkInstanceStatus sinkInstanceStatus = new SinkStatus.SinkInstanceStatus();
                sinkInstanceStatus.setInstanceId(i2);
                SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData = new SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
                sinkInstanceStatusData.setRunning(false);
                sinkInstanceStatusData.setError("Sink has not been scheduled");
                sinkInstanceStatus.setStatus(sinkInstanceStatusData);
                sinkStatus.addInstance(sinkInstanceStatus);
            }
            return sinkStatus;
        }

        @Override // org.apache.pulsar.functions.worker.rest.api.ComponentImpl.GetStatus
        public /* bridge */ /* synthetic */ SinkStatus getStatus(String str, String str2, String str3, Collection collection, URI uri) throws PulsarAdminException {
            return getStatus(str, str2, str3, (Collection<Function.Assignment>) collection, uri);
        }
    }

    public SinksImpl(Supplier<PulsarWorkerService> supplier) {
        super(supplier, Function.FunctionDetails.ComponentType.SINK);
    }

    @Override // org.apache.pulsar.functions.worker.service.api.Sinks
    public void registerSink(String str, String str2, String str3, InputStream inputStream, FormDataContentDisposition formDataContentDisposition, String str4, SinkConfig sinkConfig, AuthenticationParameters authenticationParameters) {
        Function.FunctionDetailsOrBuilder validateUpdateRequestParams;
        if (!isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        if (str == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
        }
        if (str2 == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
        }
        if (str3 == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Sink name is not provided");
        }
        if (sinkConfig == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Sink config is not provided");
        }
        throwRestExceptionIfUnauthorizedForNamespace(str, str2, str3, "register", authenticationParameters);
        try {
            worker().getBrokerAdmin().tenants().getTenantInfo(str);
            String str5 = str + "/" + str2;
            List namespaces = worker().getBrokerAdmin().namespaces().getNamespaces(str);
            if (namespaces != null && !namespaces.contains(str5)) {
                String format = String.format("%s/%s/%s", str, worker().getWorkerConfig().getPulsarFunctionsCluster(), str2);
                if (namespaces != null && !namespaces.contains(format)) {
                    log.error("{}/{}/{} Namespace {} does not exist", new Object[]{str, str2, str3, str2});
                    throw new RestException(Response.Status.BAD_REQUEST, "Namespace does not exist");
                }
            }
            if (worker().getFunctionMetaDataManager().containsFunction(str, str2, str3)) {
                log.error("{} {}/{}/{} already exists", new Object[]{ComponentTypeUtils.toString(this.componentType), str, str2, str3});
                throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s already exists", ComponentTypeUtils.toString(this.componentType), str3));
            }
            File file = null;
            try {
                try {
                    if (StringUtils.isNotBlank(str4)) {
                        if (Utils.hasPackageTypePrefix(str4)) {
                            file = downloadPackageFile(str4);
                        } else {
                            if (!Utils.isFunctionPackageUrlSupported(str4)) {
                                throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)");
                            }
                            try {
                                file = FunctionCommon.extractFileFromPkgURL(str4);
                            } catch (Exception e) {
                                throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(this.componentType), str4));
                            }
                        }
                        validateUpdateRequestParams = validateUpdateRequestParams(str, str2, str3, sinkConfig, file);
                    } else {
                        if (inputStream != null) {
                            file = WorkerUtils.dumpToTmpFile(inputStream);
                        }
                        validateUpdateRequestParams = validateUpdateRequestParams(str, str2, str3, sinkConfig, file);
                        if (!FunctionCommon.isFunctionCodeBuiltin(validateUpdateRequestParams) && (file == null || formDataContentDisposition == null)) {
                            throw new IllegalArgumentException(ComponentTypeUtils.toString(this.componentType) + " Package is not provided");
                        }
                    }
                    try {
                        worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(validateUpdateRequestParams);
                        Function.FunctionMetaData.Builder version = Function.FunctionMetaData.newBuilder().setFunctionDetails(validateUpdateRequestParams).setCreateTime(System.currentTimeMillis()).setVersion(0L);
                        if (worker().getWorkerConfig().isAuthenticationEnabled()) {
                            Function.FunctionDetailsOrBuilder functionDetailsOrBuilder = validateUpdateRequestParams;
                            worker().getFunctionRuntimeManager().getRuntimeFactory().getAuthProvider().ifPresent(functionAuthProvider -> {
                                if (authenticationParameters.getClientAuthenticationDataSource() != null) {
                                    try {
                                        functionAuthProvider.cacheAuthData(functionDetailsOrBuilder, authenticationParameters.getClientAuthenticationDataSource()).ifPresent(functionAuthData -> {
                                            version.setFunctionAuthSpec(Function.FunctionAuthenticationSpec.newBuilder().setData(ByteString.copyFrom(functionAuthData.getData())).build());
                                        });
                                    } catch (Exception e2) {
                                        log.error("Error caching authentication data for {} {}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), str, str2, str3, e2});
                                        throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString(this.componentType), str3, e2.getMessage()));
                                    }
                                }
                            });
                        }
                        try {
                            version.setPackageLocation(getFunctionPackageLocation(version.build(), str4, formDataContentDisposition, file));
                            updateRequest(null, version.build());
                            if (file == null || !file.exists()) {
                                return;
                            }
                            if (str4 == null || !str4.startsWith("file")) {
                                file.delete();
                            }
                        } catch (Exception e2) {
                            log.error("Failed process {} {}/{}/{} package: ", new Object[]{ComponentTypeUtils.toString(this.componentType), str, str2, str3, e2});
                            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e2.getMessage());
                        }
                    } catch (Exception e3) {
                        log.error("{} {}/{}/{} cannot be admitted by the runtime factory", new Object[]{ComponentTypeUtils.toString(this.componentType), str, str2, str3});
                        throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s", ComponentTypeUtils.toString(this.componentType), str3, e3.getMessage()));
                    }
                } catch (Exception e4) {
                    log.error("Invalid register {} request @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), str, str2, str3, e4});
                    throw new RestException(Response.Status.BAD_REQUEST, e4.getMessage());
                }
            } catch (Throwable th) {
                if (file != null && file.exists() && (str4 == null || !str4.startsWith("file"))) {
                    file.delete();
                }
                throw th;
            }
        } catch (PulsarAdminException.NotFoundException e5) {
            log.error("{}/{}/{} Tenant {} does not exist", new Object[]{str, str2, str3, str});
            throw new RestException(Response.Status.BAD_REQUEST, "Tenant does not exist");
        } catch (PulsarAdminException.NotAuthorizedException e6) {
            log.error("{}/{}/{} Client is not authorized to operate {} on tenant", new Object[]{str, str2, str3, ComponentTypeUtils.toString(this.componentType)});
            throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
        } catch (PulsarAdminException e7) {
            log.error("{}/{}/{} Issues getting tenant data", new Object[]{str, str2, str3, e7});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e7.getMessage());
        }
    }

    @Override // org.apache.pulsar.functions.worker.service.api.Sinks
    public void updateSink(String str, String str2, String str3, InputStream inputStream, FormDataContentDisposition formDataContentDisposition, String str4, SinkConfig sinkConfig, AuthenticationParameters authenticationParameters, UpdateOptionsImpl updateOptionsImpl) {
        Function.FunctionDetailsOrBuilder validateUpdateRequestParams;
        Function.PackageLocationMetaData.Builder functionPackageLocation;
        if (!isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        if (str == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
        }
        if (str2 == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
        }
        if (str3 == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Sink name is not provided");
        }
        if (sinkConfig == null) {
            throw new RestException(Response.Status.BAD_REQUEST, "Sink config is not provided");
        }
        throwRestExceptionIfUnauthorizedForNamespace(str, str2, str3, "update", authenticationParameters);
        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(str, str2, str3)) {
            throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(this.componentType), str3));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(str, str2, str3);
        if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{str, str2, str3, ComponentTypeUtils.toString(this.componentType)});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(this.componentType), str3));
        }
        SinkConfig convertFromDetails = SinkConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
        sinkConfig.setTenant(str);
        sinkConfig.setNamespace(str2);
        sinkConfig.setName(str3);
        try {
            SinkConfig validateUpdate = SinkConfigUtils.validateUpdate(convertFromDetails, sinkConfig);
            if (convertFromDetails.equals(validateUpdate) && StringUtils.isBlank(str4) && inputStream == null) {
                log.error("{}/{}/{} Update contains no changes", new Object[]{str, str2, str3});
                throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
            }
            File file = null;
            try {
                try {
                    if (StringUtils.isNotBlank(str4)) {
                        if (Utils.hasPackageTypePrefix(str4)) {
                            file = downloadPackageFile(str4);
                        } else {
                            try {
                                file = FunctionCommon.extractFileFromPkgURL(str4);
                            } catch (Exception e) {
                                throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(this.componentType), str4));
                            }
                        }
                        validateUpdateRequestParams = validateUpdateRequestParams(str, str2, str3, validateUpdate, file);
                    } else if (functionMetaData.getPackageLocation().getPackagePath().startsWith("file") || functionMetaData.getPackageLocation().getPackagePath().startsWith("http")) {
                        try {
                            file = FunctionCommon.extractFileFromPkgURL(functionMetaData.getPackageLocation().getPackagePath());
                            validateUpdateRequestParams = validateUpdateRequestParams(str, str2, str3, validateUpdate, file);
                        } catch (Exception e2) {
                            throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e2.getMessage(), ComponentTypeUtils.toString(this.componentType), str4));
                        }
                    } else if (inputStream != null) {
                        file = WorkerUtils.dumpToTmpFile(inputStream);
                        validateUpdateRequestParams = validateUpdateRequestParams(str, str2, str3, validateUpdate, file);
                    } else if (functionMetaData.getPackageLocation().getPackagePath().startsWith("builtin")) {
                        validateUpdateRequestParams = validateUpdateRequestParams(str, str2, str3, validateUpdate, null);
                        if (!FunctionCommon.isFunctionCodeBuiltin(validateUpdateRequestParams) && (0 == 0 || formDataContentDisposition == null)) {
                            throw new IllegalArgumentException(ComponentTypeUtils.toString(this.componentType) + " Package is not provided");
                        }
                    } else {
                        file = FunctionCommon.createPkgTempFile();
                        file.deleteOnExit();
                        if (worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) {
                            worker().getBrokerAdmin().packages().download(functionMetaData.getPackageLocation().getPackagePath(), file.getAbsolutePath());
                        } else {
                            WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), file, functionMetaData.getPackageLocation().getPackagePath());
                        }
                        validateUpdateRequestParams = validateUpdateRequestParams(str, str2, str3, validateUpdate, file);
                    }
                    try {
                        worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(validateUpdateRequestParams);
                        Function.FunctionMetaData.Builder functionDetails = Function.FunctionMetaData.newBuilder().mergeFrom(functionMetaData).setFunctionDetails(validateUpdateRequestParams);
                        if (worker().getWorkerConfig().isAuthenticationEnabled()) {
                            Function.FunctionDetailsOrBuilder functionDetailsOrBuilder = validateUpdateRequestParams;
                            worker().getFunctionRuntimeManager().getRuntimeFactory().getAuthProvider().ifPresent(functionAuthProvider -> {
                                if (authenticationParameters.getClientAuthenticationDataSource() == null || updateOptionsImpl == null || !updateOptionsImpl.isUpdateAuthData()) {
                                    return;
                                }
                                Optional empty = Optional.empty();
                                if (functionDetails.hasFunctionAuthSpec()) {
                                    empty = Optional.ofNullable(FunctionAuthUtils.getFunctionAuthData(Optional.ofNullable(functionDetails.getFunctionAuthSpec())));
                                }
                                try {
                                    Optional updateAuthData = functionAuthProvider.updateAuthData(functionDetailsOrBuilder, empty, authenticationParameters.getClientAuthenticationDataSource());
                                    if (updateAuthData.isPresent()) {
                                        functionDetails.setFunctionAuthSpec(Function.FunctionAuthenticationSpec.newBuilder().setData(ByteString.copyFrom(((FunctionAuthData) updateAuthData.get()).getData())).build());
                                    } else {
                                        functionDetails.clearFunctionAuthSpec();
                                    }
                                } catch (Exception e3) {
                                    log.error("Error updating authentication data for {} {}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), str, str2, str3, e3});
                                    throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString(this.componentType), str3, e3.getMessage()));
                                }
                            });
                        }
                        if (StringUtils.isNotBlank(str4) || inputStream != null) {
                            Function.FunctionMetaData build = functionDetails.build();
                            try {
                                functionPackageLocation = getFunctionPackageLocation(FunctionMetaDataUtils.incrMetadataVersion(build, build), str4, formDataContentDisposition, file);
                            } catch (Exception e3) {
                                log.error("Failed process {} {}/{}/{} package: ", new Object[]{ComponentTypeUtils.toString(this.componentType), str, str2, str3, e3});
                                throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e3.getMessage());
                            }
                        } else {
                            functionPackageLocation = Function.PackageLocationMetaData.newBuilder().mergeFrom(functionMetaData.getPackageLocation());
                        }
                        functionDetails.setPackageLocation(functionPackageLocation);
                        updateRequest(functionMetaData, functionDetails.build());
                        if (file == null || !file.exists()) {
                            return;
                        }
                        if ((str4 == null || str4.startsWith("file")) && inputStream == null) {
                            return;
                        }
                        file.delete();
                    } catch (Exception e4) {
                        log.error("Updated {} {}/{}/{} cannot be submitted to runtime factory", new Object[]{ComponentTypeUtils.toString(this.componentType), str, str2, str3});
                        throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s", ComponentTypeUtils.toString(this.componentType), str3, e4.getMessage()));
                    }
                } catch (Exception e5) {
                    log.error("Invalid update {} request @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString(this.componentType), str, str2, str3, e5});
                    throw new RestException(Response.Status.BAD_REQUEST, e5.getMessage());
                }
            } catch (Throwable th) {
                if (file != null && file.exists() && ((str4 != null && !str4.startsWith("file")) || inputStream != null)) {
                    file.delete();
                }
                throw th;
            }
        } catch (Exception e6) {
            throw new RestException(Response.Status.BAD_REQUEST, e6.getMessage());
        }
    }

    private ExceptionInformation getExceptionInformation(InstanceCommunication.FunctionStatus.ExceptionInformation exceptionInformation) {
        ExceptionInformation exceptionInformation2 = new ExceptionInformation();
        exceptionInformation2.setTimestampMs(exceptionInformation.getMsSinceEpoch());
        exceptionInformation2.setExceptionString(exceptionInformation.getExceptionString());
        return exceptionInformation2;
    }

    @Override // org.apache.pulsar.functions.worker.service.api.Sinks
    public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus(String str, String str2, String str3, String str4, URI uri, AuthenticationParameters authenticationParameters) {
        componentInstanceStatusRequestValidate(str, str2, str3, Integer.parseInt(str4), authenticationParameters);
        try {
            return new GetSinkStatus().getComponentInstanceStatus(str, str2, str3, Integer.parseInt(str4), uri);
        } catch (Exception e) {
            log.error("{}/{}/{} Got Exception Getting Status", new Object[]{str, str2, str3, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        } catch (WebApplicationException e2) {
            throw e2;
        }
    }

    @Override // org.apache.pulsar.functions.worker.service.api.Sinks
    public SinkStatus getSinkStatus(String str, String str2, String str3, URI uri, AuthenticationParameters authenticationParameters) {
        componentStatusRequestValidate(str, str2, str3, authenticationParameters);
        try {
            return new GetSinkStatus().getComponentStatus(str, str2, str3, uri);
        } catch (WebApplicationException e) {
            throw e;
        } catch (Exception e2) {
            log.error("{}/{}/{} Got Exception Getting Status", new Object[]{str, str2, str3, e2});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e2.getMessage());
        }
    }

    @Override // org.apache.pulsar.functions.worker.service.api.Sinks
    public SinkConfig getSinkInfo(String str, String str2, String str3, AuthenticationParameters authenticationParameters) {
        componentStatusRequestValidate(str, str2, str3, authenticationParameters);
        return SinkConfigUtils.convertFromDetails(worker().getFunctionMetaDataManager().getFunctionMetaData(str, str2, str3).getFunctionDetails());
    }

    @Override // org.apache.pulsar.functions.worker.service.api.Sinks
    public List<ConnectorDefinition> getSinkList() {
        List<ConnectorDefinition> listOfConnectors = getListOfConnectors();
        ArrayList arrayList = new ArrayList();
        for (ConnectorDefinition connectorDefinition : listOfConnectors) {
            if (!org.apache.commons.lang.StringUtils.isEmpty(connectorDefinition.getSinkClass())) {
                arrayList.add(connectorDefinition);
            }
        }
        return arrayList;
    }

    @Override // org.apache.pulsar.functions.worker.service.api.Sinks
    public List<ConfigFieldDefinition> getSinkConfigDefinition(String str) {
        if (!isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        List<ConfigFieldDefinition> sinkConfigDefinition = worker().getConnectorsManager().getSinkConfigDefinition(str);
        if (sinkConfigDefinition == null) {
            throw new RestException(Response.Status.NOT_FOUND, "builtin sink does not exist");
        }
        return sinkConfigDefinition;
    }

    private Function.FunctionDetails validateUpdateRequestParams(String str, String str2, String str3, SinkConfig sinkConfig, File file) throws IOException {
        sinkConfig.setTenant(str);
        sinkConfig.setNamespace(str2);
        sinkConfig.setName(str3);
        Utils.inferMissingArguments(sinkConfig);
        ClassLoader classLoader = null;
        if (!StringUtils.isEmpty(sinkConfig.getArchive())) {
            String archive = sinkConfig.getArchive();
            if (archive.startsWith("builtin")) {
                Connector connector = worker().getConnectorsManager().getConnector(archive.replaceFirst("^builtin://", ""));
                if (connector == null) {
                    throw new IllegalArgumentException("Built-in sink is not available");
                }
                classLoader = connector.getClassLoader();
            }
        }
        boolean z = false;
        if (classLoader == null && file != null) {
            try {
                classLoader = getClassLoaderFromPackage(sinkConfig.getClassName(), file, worker().getWorkerConfig().getNarExtractionDirectory());
                z = true;
            } catch (Throwable th) {
                if (0 != 0) {
                    ClassLoaderUtils.closeClassLoader(classLoader);
                }
                throw th;
            }
        }
        if (classLoader == null) {
            throw new IllegalArgumentException("Sink package is not provided");
        }
        Function.FunctionDetails convert = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validateAndExtractDetails(sinkConfig, classLoader, worker().getWorkerConfig().getValidateConnectorConfig().booleanValue()));
        if (z) {
            ClassLoaderUtils.closeClassLoader(classLoader);
        }
        return convert;
    }

    private File downloadPackageFile(String str) throws IOException, PulsarAdminException {
        return FunctionsImpl.downloadPackageFile(worker(), str);
    }
}
