package org.axonframework.queryhandling;

import java.time.Duration;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.axonframework.messaging.responsetypes.ResponseTypes;
import org.axonframework.tracing.TestSpanFactory;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

/* loaded from: input_file:org/axonframework/queryhandling/SimpleQueryUpdateEmitterTest.class */
class SimpleQueryUpdateEmitterTest {
    private final TestSpanFactory spanFactory = new TestSpanFactory();
    private final SimpleQueryUpdateEmitter testSubject = SimpleQueryUpdateEmitter.builder().spanFactory(this.spanFactory).build();

    SimpleQueryUpdateEmitterTest() {
    }

    @Test
    void completingRegistrationOldApi() {
        UpdateHandlerRegistration registerUpdateHandler = this.testSubject.registerUpdateHandler(new GenericSubscriptionQueryMessage("some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class)), SubscriptionQueryBackpressure.defaultBackpressure(), 1024);
        this.testSubject.emit(subscriptionQueryMessage -> {
            return true;
        }, "some-awesome-text");
        registerUpdateHandler.complete();
        StepVerifier.create(registerUpdateHandler.getUpdates().map((v0) -> {
            return v0.getPayload();
        })).expectNext("some-awesome-text").verifyComplete();
    }

    @Test
    void concurrentUpdateEmitting() {
        UpdateHandlerRegistration registerUpdateHandler = this.testSubject.registerUpdateHandler(new GenericSubscriptionQueryMessage("some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class)), 128);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        for (int i = 0; i < 100; i++) {
            newFixedThreadPool.submit(() -> {
                this.testSubject.emit(subscriptionQueryMessage -> {
                    return true;
                }, "Update");
            });
        }
        newFixedThreadPool.shutdown();
        StepVerifier.create(registerUpdateHandler.getUpdates()).expectNextCount(100L).then(() -> {
            this.testSubject.complete(subscriptionQueryMessage -> {
                return true;
            });
        }).verifyComplete();
    }

    @Test
    void concurrentUpdateEmitting_WithBackpressure() {
        UpdateHandlerRegistration registerUpdateHandler = this.testSubject.registerUpdateHandler(new GenericSubscriptionQueryMessage("some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class)), SubscriptionQueryBackpressure.defaultBackpressure(), 128);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        for (int i = 0; i < 100; i++) {
            newFixedThreadPool.submit(() -> {
                this.testSubject.emit(subscriptionQueryMessage -> {
                    return true;
                }, "Update");
            });
        }
        newFixedThreadPool.shutdown();
        StepVerifier.create(registerUpdateHandler.getUpdates()).expectNextCount(100L).then(() -> {
            this.testSubject.complete(subscriptionQueryMessage -> {
                return true;
            });
        }).verifyComplete();
    }

    @Test
    void cancelingRegistrationDoesNotCompleteFluxOfUpdatesOldApi() {
        UpdateHandlerRegistration registerUpdateHandler = this.testSubject.registerUpdateHandler(new GenericSubscriptionQueryMessage("some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class)), SubscriptionQueryBackpressure.defaultBackpressure(), 1024);
        this.testSubject.emit(subscriptionQueryMessage -> {
            return true;
        }, "some-awesome-text");
        registerUpdateHandler.getRegistration().cancel();
        StepVerifier.create(registerUpdateHandler.getUpdates().map((v0) -> {
            return v0.getPayload();
        })).expectNext("some-awesome-text").verifyTimeout(Duration.ofMillis(500L));
    }

    @Test
    void completingRegistration() {
        UpdateHandlerRegistration registerUpdateHandler = this.testSubject.registerUpdateHandler(new GenericSubscriptionQueryMessage("some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class)), 1024);
        registerUpdateHandler.getUpdates().subscribe();
        this.testSubject.emit(subscriptionQueryMessage -> {
            return true;
        }, "some-awesome-text");
        registerUpdateHandler.complete();
        StepVerifier.create(registerUpdateHandler.getUpdates().map((v0) -> {
            return v0.getPayload();
        })).expectNext("some-awesome-text").verifyComplete();
    }

    @Test
    void queryUpdateEmitterIsTraced() {
        UpdateHandlerRegistration registerUpdateHandler = this.testSubject.registerUpdateHandler(new GenericSubscriptionQueryMessage("some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class)), 1024);
        registerUpdateHandler.getUpdates().subscribe();
        this.testSubject.emit(subscriptionQueryMessage -> {
            return true;
        }, "some-awesome-text");
        registerUpdateHandler.complete();
        this.spanFactory.verifySpanCompleted("SimpleQueryUpdateEmitter.emit");
        this.spanFactory.verifySpanHasType("SimpleQueryUpdateEmitter.emit", TestSpanFactory.TestSpanType.INTERNAL);
        this.spanFactory.verifySpanCompleted("SimpleQueryUpdateEmitter.doEmit");
        this.spanFactory.verifySpanHasType("SimpleQueryUpdateEmitter.doEmit", TestSpanFactory.TestSpanType.DISPATCH);
    }

    @Test
    void differentUpdateAreDisambiguatedAndWrongTypesAreFilteredBasedOnQueryTypes() {
        UpdateHandlerRegistration registerUpdateHandler = this.testSubject.registerUpdateHandler(new GenericSubscriptionQueryMessage("some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(Integer.class)), 1024);
        registerUpdateHandler.getUpdates().subscribe();
        this.testSubject.emit(subscriptionQueryMessage -> {
            return true;
        }, "some-awesome-text");
        this.testSubject.emit(subscriptionQueryMessage2 -> {
            return true;
        }, 1234);
        registerUpdateHandler.complete();
        StepVerifier.create(registerUpdateHandler.getUpdates().map((v0) -> {
            return v0.getPayload();
        })).expectNext(1234).verifyComplete();
    }

    @Test
    void updateResponseTypeFilteringWorksForMultipleInstanceOfWithArrayAndList() {
        UpdateHandlerRegistration registerUpdateHandler = this.testSubject.registerUpdateHandler(new GenericSubscriptionQueryMessage("some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.multipleInstancesOf(String.class)), 1024);
        registerUpdateHandler.getUpdates().subscribe();
        this.testSubject.emit(subscriptionQueryMessage -> {
            return true;
        }, "some-awesome-text");
        this.testSubject.emit(subscriptionQueryMessage2 -> {
            return true;
        }, 1234);
        this.testSubject.emit(subscriptionQueryMessage3 -> {
            return true;
        }, Optional.of("optional-payload"));
        this.testSubject.emit(subscriptionQueryMessage4 -> {
            return true;
        }, Optional.empty());
        this.testSubject.emit(subscriptionQueryMessage5 -> {
            return true;
        }, new String[]{"array-item-1", "array-item-2"});
        this.testSubject.emit(subscriptionQueryMessage6 -> {
            return true;
        }, Arrays.asList("list-item-1", "list-item-2"));
        this.testSubject.emit(subscriptionQueryMessage7 -> {
            return true;
        }, Flux.just(new String[]{"flux-item-1", "flux-item-2"}));
        this.testSubject.emit(subscriptionQueryMessage8 -> {
            return true;
        }, Mono.just("mono-item"));
        registerUpdateHandler.complete();
        StepVerifier.create(registerUpdateHandler.getUpdates().map((v0) -> {
            return v0.getPayload();
        })).expectNextMatches(obj -> {
            return CoreMatchers.equalTo(new String[]{"array-item-1", "array-item-2"}).matches(obj);
        }).expectNextMatches(obj2 -> {
            return CoreMatchers.equalTo(Arrays.asList("list-item-1", "list-item-2")).matches(obj2);
        }).verifyComplete();
    }

    @Test
    void updateResponseTypeFilteringWorksForOptionaInstanceOf() {
        UpdateHandlerRegistration registerUpdateHandler = this.testSubject.registerUpdateHandler(new GenericSubscriptionQueryMessage("some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.optionalInstanceOf(String.class)), 1024);
        registerUpdateHandler.getUpdates().subscribe();
        this.testSubject.emit(subscriptionQueryMessage -> {
            return true;
        }, "some-awesome-text");
        this.testSubject.emit(subscriptionQueryMessage2 -> {
            return true;
        }, 1234);
        this.testSubject.emit(subscriptionQueryMessage3 -> {
            return true;
        }, Optional.of("optional-payload"));
        this.testSubject.emit(subscriptionQueryMessage4 -> {
            return true;
        }, Optional.empty());
        this.testSubject.emit(subscriptionQueryMessage5 -> {
            return true;
        }, new String[]{"array-item-1", "array-item-2"});
        this.testSubject.emit(subscriptionQueryMessage6 -> {
            return true;
        }, Arrays.asList("list-item-1", "list-item-2"));
        this.testSubject.emit(subscriptionQueryMessage7 -> {
            return true;
        }, Flux.just(new String[]{"flux-item-1", "flux-item-2"}));
        this.testSubject.emit(subscriptionQueryMessage8 -> {
            return true;
        }, Mono.just("mono-item"));
        registerUpdateHandler.complete();
        StepVerifier.create(registerUpdateHandler.getUpdates().map((v0) -> {
            return v0.getPayload();
        })).expectNext(Optional.of("optional-payload"), Optional.empty()).verifyComplete();
    }

    @Test
    void updateResponseTypeFilteringWorksForPublisherOf() {
        UpdateHandlerRegistration registerUpdateHandler = this.testSubject.registerUpdateHandler(new GenericSubscriptionQueryMessage("some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.publisherOf(String.class)), 1024);
        registerUpdateHandler.getUpdates().subscribe();
        this.testSubject.emit(subscriptionQueryMessage -> {
            return true;
        }, "some-awesome-text");
        this.testSubject.emit(subscriptionQueryMessage2 -> {
            return true;
        }, 1234);
        this.testSubject.emit(subscriptionQueryMessage3 -> {
            return true;
        }, Optional.of("optional-payload"));
        this.testSubject.emit(subscriptionQueryMessage4 -> {
            return true;
        }, Optional.empty());
        this.testSubject.emit(subscriptionQueryMessage5 -> {
            return true;
        }, new String[]{"array-item-1", "array-item-2"});
        this.testSubject.emit(subscriptionQueryMessage6 -> {
            return true;
        }, Arrays.asList("list-item-1", "list-item-2"));
        this.testSubject.emit(subscriptionQueryMessage7 -> {
            return true;
        }, Flux.just(new String[]{"flux-item-1", "flux-item-2"}));
        this.testSubject.emit(subscriptionQueryMessage8 -> {
            return true;
        }, Mono.just("mono-item"));
        this.testSubject.emit(subscriptionQueryMessage9 -> {
            return true;
        }, Mono.empty());
        registerUpdateHandler.complete();
        StepVerifier.create(registerUpdateHandler.getUpdates().map((v0) -> {
            return v0.getPayload();
        })).expectNextMatches(obj -> {
            try {
                StepVerifier.create((Publisher) obj).expectNext("flux-item-1", "flux-item-2").verifyComplete();
                return true;
            } catch (Exception e) {
                return false;
            }
        }).expectNextMatches(obj2 -> {
            try {
                StepVerifier.create((Publisher) obj2).expectNext("mono-item").verifyComplete();
                return true;
            } catch (Exception e) {
                return false;
            }
        }).expectNextMatches(obj3 -> {
            try {
                StepVerifier.create((Publisher) obj3).verifyComplete();
                return true;
            } catch (Exception e) {
                return false;
            }
        }).verifyComplete();
    }

    @Test
    void multipleInstanceUpdatesAreDelivered() {
        UpdateHandlerRegistration registerUpdateHandler = this.testSubject.registerUpdateHandler(new GenericSubscriptionQueryMessage("some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.multipleInstancesOf(String.class)), 1024);
        registerUpdateHandler.getUpdates().subscribe();
        this.testSubject.emit(subscriptionQueryMessage -> {
            return true;
        }, Arrays.asList("text1", "text2"));
        this.testSubject.emit(subscriptionQueryMessage2 -> {
            return true;
        }, Arrays.asList("text3", "text4"));
        registerUpdateHandler.complete();
        StepVerifier.create(registerUpdateHandler.getUpdates().map((v0) -> {
            return v0.getPayload();
        })).expectNext(Arrays.asList("text1", "text2"), Arrays.asList("text3", "text4")).verifyComplete();
    }

    @Test
    void optionalUpdatesAreDelivered() {
        UpdateHandlerRegistration registerUpdateHandler = this.testSubject.registerUpdateHandler(new GenericSubscriptionQueryMessage("some-payload", "chatMessages", ResponseTypes.optionalInstanceOf(String.class), ResponseTypes.optionalInstanceOf(String.class)), 1024);
        registerUpdateHandler.getUpdates().subscribe();
        this.testSubject.emit(subscriptionQueryMessage -> {
            return true;
        }, Optional.of("text1"));
        this.testSubject.emit(subscriptionQueryMessage2 -> {
            return true;
        }, Optional.of("text2"));
        registerUpdateHandler.complete();
        StepVerifier.create(registerUpdateHandler.getUpdates().map((v0) -> {
            return v0.getPayload();
        })).expectNext(Optional.of("text1"), Optional.of("text2")).verifyComplete();
    }

    @Test
    void cancelingRegistrationDoesNotCompleteFluxOfUpdates() {
        UpdateHandlerRegistration registerUpdateHandler = this.testSubject.registerUpdateHandler(new GenericSubscriptionQueryMessage("some-payload", "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class)), 1024);
        registerUpdateHandler.getUpdates().subscribe();
        this.testSubject.emit(subscriptionQueryMessage -> {
            return true;
        }, "some-awesome-text");
        registerUpdateHandler.getRegistration().cancel();
        StepVerifier.create(registerUpdateHandler.getUpdates().map((v0) -> {
            return v0.getPayload();
        })).expectNext("some-awesome-text").verifyTimeout(Duration.ofMillis(500L));
    }
}
