package org.springframework.scheduling.annotation;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.aop.support.AopUtils;
import org.springframework.core.CoroutinesUtils;
import org.springframework.core.KotlinDetector;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.scheduling.support.DefaultScheduledTaskObservationConvention;
import org.springframework.scheduling.support.ScheduledTaskObservationContext;
import org.springframework.scheduling.support.ScheduledTaskObservationConvention;
import org.springframework.scheduling.support.ScheduledTaskObservationDocumentation;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/spring-context-6.1.11.jar:org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupport.class */
public abstract class ScheduledAnnotationReactiveSupport {
    static final boolean reactorPresent = ClassUtils.isPresent("reactor.core.publisher.Flux", ScheduledAnnotationReactiveSupport.class.getClassLoader());
    static final boolean coroutinesReactorPresent = ClassUtils.isPresent("kotlinx.coroutines.reactor.MonoKt", ScheduledAnnotationReactiveSupport.class.getClassLoader());
    private static final Log logger = LogFactory.getLog(ScheduledAnnotationReactiveSupport.class);

    /* loaded from: input_file:BOOT-INF/lib/spring-context-6.1.11.jar:org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupport$SubscribingRunnable.class */
    static final class SubscribingRunnable implements SchedulingAwareRunnable {
        private static final ScheduledTaskObservationConvention DEFAULT_CONVENTION = new DefaultScheduledTaskObservationConvention();
        private final Publisher<?> publisher;
        final boolean shouldBlock;

        @Nullable
        private final String qualifier;
        private final List<Runnable> subscriptionTrackerRegistry;
        final Supplier<ObservationRegistry> observationRegistrySupplier;
        final Supplier<ScheduledTaskObservationContext> contextSupplier;

        SubscribingRunnable(Publisher<?> publisher, boolean z, @Nullable String str, List<Runnable> list, Supplier<ObservationRegistry> supplier, Supplier<ScheduledTaskObservationContext> supplier2) {
            this.publisher = publisher;
            this.shouldBlock = z;
            this.qualifier = str;
            this.subscriptionTrackerRegistry = list;
            this.observationRegistrySupplier = supplier;
            this.contextSupplier = supplier2;
        }

        @Override // org.springframework.scheduling.SchedulingAwareRunnable
        @Nullable
        public String getQualifier() {
            return this.qualifier;
        }

        @Override // java.lang.Runnable
        public void run() {
            Observation observation = ScheduledTaskObservationDocumentation.TASKS_SCHEDULED_EXECUTION.observation(null, DEFAULT_CONVENTION, this.contextSupplier, this.observationRegistrySupplier.get());
            if (!this.shouldBlock) {
                subscribe(new TrackingSubscriber(this.subscriptionTrackerRegistry, observation), observation);
                return;
            }
            CountDownLatch countDownLatch = new CountDownLatch(1);
            subscribe(new TrackingSubscriber(this.subscriptionTrackerRegistry, observation, countDownLatch), observation);
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        private void subscribe(TrackingSubscriber trackingSubscriber, Observation observation) {
            this.subscriptionTrackerRegistry.add(trackingSubscriber);
            if (ScheduledAnnotationReactiveSupport.reactorPresent) {
                Flux.from(this.publisher).contextWrite(context -> {
                    return context.put("micrometer.observation", observation);
                }).subscribe(trackingSubscriber);
            } else {
                this.publisher.subscribe(trackingSubscriber);
            }
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/spring-context-6.1.11.jar:org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupport$TrackingSubscriber.class */
    private static final class TrackingSubscriber implements Subscriber<Object>, Runnable {
        private final List<Runnable> subscriptionTrackerRegistry;
        private final Observation observation;

        @Nullable
        private final CountDownLatch blockingLatch;

        @Nullable
        private Subscription subscription;

        TrackingSubscriber(List<Runnable> list, Observation observation) {
            this(list, observation, null);
        }

        TrackingSubscriber(List<Runnable> list, Observation observation, @Nullable CountDownLatch countDownLatch) {
            this.subscriptionTrackerRegistry = list;
            this.observation = observation;
            this.blockingLatch = countDownLatch;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.subscription != null) {
                this.subscription.cancel();
                this.observation.stop();
            }
            if (this.blockingLatch != null) {
                this.blockingLatch.countDown();
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            this.observation.start();
            subscription.request(2147483647L);
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(Object obj) {
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            this.subscriptionTrackerRegistry.remove(this);
            ScheduledAnnotationReactiveSupport.logger.warn("Unexpected error occurred in scheduled reactive task", th);
            this.observation.error(th);
            this.observation.stop();
            if (this.blockingLatch != null) {
                this.blockingLatch.countDown();
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            this.subscriptionTrackerRegistry.remove(this);
            Observation.Context context = this.observation.getContext();
            if (context instanceof ScheduledTaskObservationContext) {
                ((ScheduledTaskObservationContext) context).setComplete(true);
            }
            this.observation.stop();
            if (this.blockingLatch != null) {
                this.blockingLatch.countDown();
            }
        }
    }

    ScheduledAnnotationReactiveSupport() {
    }

    public static boolean isReactive(Method method) {
        ReactiveAdapter adapter;
        if (KotlinDetector.isKotlinPresent() && KotlinDetector.isSuspendingFunction(method)) {
            Assert.isTrue(method.getParameterCount() == 1, "Kotlin suspending functions may only be annotated with @Scheduled if declared without arguments");
            Assert.isTrue(coroutinesReactorPresent, "Kotlin suspending functions may only be annotated with @Scheduled if the Coroutine-Reactor bridge (kotlinx.coroutines.reactor) is present at runtime");
            return true;
        }
        ReactiveAdapterRegistry sharedInstance = ReactiveAdapterRegistry.getSharedInstance();
        if (!sharedInstance.hasAdapters() || (adapter = sharedInstance.getAdapter(method.getReturnType())) == null) {
            return false;
        }
        Assert.isTrue(method.getParameterCount() == 0, "Reactive methods may only be annotated with @Scheduled if declared without arguments");
        Assert.isTrue(adapter.getDescriptor().isDeferred(), "Reactive methods may only be annotated with @Scheduled if the return type supports deferred execution");
        return true;
    }

    public static Runnable createSubscriptionRunnable(Method method, Object obj, Scheduled scheduled, Supplier<ObservationRegistry> supplier, List<Runnable> list) {
        return new SubscribingRunnable(getPublisherFor(method, obj), scheduled.fixedDelay() > 0 || StringUtils.hasText(scheduled.fixedDelayString()), scheduled.scheduler(), list, supplier, () -> {
            return new ScheduledTaskObservationContext(obj, method);
        });
    }

    static Publisher<?> getPublisherFor(Method method, Object obj) {
        if (KotlinDetector.isKotlinPresent() && KotlinDetector.isSuspendingFunction(method)) {
            return CoroutinesUtils.invokeSuspendingFunction(method, obj, method.getParameters());
        }
        ReactiveAdapterRegistry sharedInstance = ReactiveAdapterRegistry.getSharedInstance();
        Class<?> returnType = method.getReturnType();
        ReactiveAdapter adapter = sharedInstance.getAdapter(returnType);
        if (adapter == null) {
            throw new IllegalArgumentException("Cannot convert @Scheduled reactive method return type to Publisher");
        }
        if (!adapter.getDescriptor().isDeferred()) {
            throw new IllegalArgumentException("Cannot convert @Scheduled reactive method return type to Publisher: " + returnType.getSimpleName() + " is not a deferred reactive type");
        }
        Method selectInvocableMethod = AopUtils.selectInvocableMethod(method, obj.getClass());
        try {
            ReflectionUtils.makeAccessible(selectInvocableMethod);
            Publisher<?> publisher = adapter.toPublisher(selectInvocableMethod.invoke(obj, new Object[0]));
            return reactorPresent ? Flux.from(publisher).checkpoint("@Scheduled '" + method.getName() + "()' in '" + method.getDeclaringClass().getName() + "'") : publisher;
        } catch (IllegalAccessException e) {
            throw new IllegalArgumentException("Cannot obtain a Publisher-convertible value from the @Scheduled reactive method", e);
        } catch (InvocationTargetException e2) {
            throw new IllegalArgumentException("Cannot obtain a Publisher-convertible value from the @Scheduled reactive method", e2.getTargetException());
        }
    }
}
