package io.fluxcapacitor.javaclient.test;

import io.fluxcapacitor.common.Guarantee;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.Data;
import io.fluxcapacitor.common.api.Metadata;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.common.handling.Handler;
import io.fluxcapacitor.common.handling.HandlerConfiguration;
import io.fluxcapacitor.javaclient.FluxCapacitor;
import io.fluxcapacitor.javaclient.common.ClientUtils;
import io.fluxcapacitor.javaclient.common.IdentityProvider;
import io.fluxcapacitor.javaclient.common.Message;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingMessage;
import io.fluxcapacitor.javaclient.configuration.DefaultFluxCapacitor;
import io.fluxcapacitor.javaclient.configuration.FluxCapacitorBuilder;
import io.fluxcapacitor.javaclient.configuration.client.Client;
import io.fluxcapacitor.javaclient.configuration.client.InMemoryClient;
import io.fluxcapacitor.javaclient.persisting.search.Search;
import io.fluxcapacitor.javaclient.publishing.DispatchInterceptor;
import io.fluxcapacitor.javaclient.scheduling.DefaultScheduler;
import io.fluxcapacitor.javaclient.scheduling.Schedule;
import io.fluxcapacitor.javaclient.scheduling.client.InMemorySchedulingClient;
import io.fluxcapacitor.javaclient.tracking.BatchInterceptor;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.Tracker;
import io.fluxcapacitor.javaclient.tracking.handling.HandlerInterceptor;
import io.fluxcapacitor.javaclient.tracking.handling.authentication.User;
import io.fluxcapacitor.javaclient.tracking.handling.authentication.UserProvider;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.TemporalAmount;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/javaclient/test/TestFixture.class */
public class TestFixture implements Given, When {
    private static final Logger log = LoggerFactory.getLogger(TestFixture.class);
    public static Duration defaultResultTimeout = Duration.ofSeconds(10);
    public static Duration defaultConsumerTimeout = Duration.ofSeconds(30);
    private final FluxCapacitor fluxCapacitor;
    private final boolean synchronous;
    private final Registration registration;
    private final GivenWhenThenInterceptor interceptor;
    private volatile boolean collectingResults;
    private Duration resultTimeout = defaultResultTimeout;
    private Duration consumerTimeout = defaultConsumerTimeout;
    private final Map<ConsumerConfiguration, List<Message>> consumers = new ConcurrentHashMap();
    private final List<Message> commands = new CopyOnWriteArrayList();
    private final List<Message> events = new CopyOnWriteArrayList();
    private final List<Schedule> schedules = new CopyOnWriteArrayList();
    private final List<Throwable> exceptions = new CopyOnWriteArrayList();

    /* renamed from: io.fluxcapacitor.javaclient.test.TestFixture$1, reason: invalid class name */
    /* loaded from: input_file:io/fluxcapacitor/javaclient/test/TestFixture$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$fluxcapacitor$common$MessageType = new int[MessageType.values().length];

        static {
            try {
                $SwitchMap$io$fluxcapacitor$common$MessageType[MessageType.COMMAND.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$fluxcapacitor$common$MessageType[MessageType.EVENT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$fluxcapacitor$common$MessageType[MessageType.SCHEDULE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:io/fluxcapacitor/javaclient/test/TestFixture$GivenWhenThenInterceptor.class */
    protected class GivenWhenThenInterceptor implements DispatchInterceptor, BatchInterceptor, HandlerInterceptor {
        private static final String TRACE_TAG = "$givenWhenThen.trace";
        private static final String IGNORE_TAG = "$givenWhenThen.ignore";
        private final Map<MessageType, List<Message>> publishedSchedules = new ConcurrentHashMap();

        protected GivenWhenThenInterceptor() {
        }

        protected Message trace(Object obj) {
            Message message = obj instanceof Message ? (Message) obj : new Message(obj, Metadata.empty());
            return message.withMetadata(message.getMetadata().with(TRACE_TAG, "true"));
        }

        public Function<Message, SerializedMessage> interceptDispatch(Function<Message, SerializedMessage> function, MessageType messageType) {
            return message -> {
                if (!TestFixture.this.collectingResults) {
                    message = message.withMetadata(message.getMetadata().with(IGNORE_TAG, "true"));
                }
                DeserializingMessage current = DeserializingMessage.getCurrent();
                if (current != null) {
                    if (current.getMessageType() != MessageType.SCHEDULE && current.getMetadata().containsKey(IGNORE_TAG)) {
                        message = message.withMetadata(message.getMetadata().with(IGNORE_TAG, "true"));
                    }
                    if (current.getMetadata().containsKey(TRACE_TAG)) {
                        message = message.withMetadata(message.getMetadata().without(TRACE_TAG));
                    }
                }
                if (messageType == MessageType.SCHEDULE) {
                    addMessage(this.publishedSchedules.computeIfAbsent(MessageType.SCHEDULE, messageType2 -> {
                        return new CopyOnWriteArrayList();
                    }), message);
                }
                synchronized (TestFixture.this.consumers) {
                    Message message = message;
                    TestFixture.this.consumers.entrySet().stream().filter(entry -> {
                        ConsumerConfiguration consumerConfiguration = (ConsumerConfiguration) entry.getKey();
                        return consumerConfiguration.getMessageType() == messageType && ((Boolean) Optional.ofNullable(consumerConfiguration.getTypeFilter()).map(str -> {
                            return Boolean.valueOf(message.getPayload().getClass().getName().matches(str));
                        }).orElse(true)).booleanValue();
                    }).forEach(entry2 -> {
                        addMessage((List) entry2.getValue(), message);
                    });
                }
                if (!message.getMetadata().containsAnyKey(new String[]{IGNORE_TAG, TRACE_TAG})) {
                    switch (AnonymousClass1.$SwitchMap$io$fluxcapacitor$common$MessageType[messageType.ordinal()]) {
                        case 1:
                            TestFixture.this.registerCommand(message);
                            break;
                        case 2:
                            TestFixture.this.registerEvent(message);
                            break;
                        case 3:
                            TestFixture.this.registerSchedule((Schedule) message);
                            break;
                    }
                }
                return (SerializedMessage) function.apply(message);
            };
        }

        protected void addMessage(List<Message> list, Message message) {
            if (message instanceof Schedule) {
                list.removeIf(message2 -> {
                    return (message2 instanceof Schedule) && ((Schedule) message2).getScheduleId().equals(((Schedule) message).getScheduleId());
                });
            }
            list.add(message);
        }

        public Consumer<MessageBatch> intercept(Consumer<MessageBatch> consumer, Tracker tracker) {
            List<Message> computeIfAbsent = TestFixture.this.consumers.computeIfAbsent(tracker.getConfiguration(), consumerConfiguration -> {
                return (List) this.publishedSchedules.getOrDefault(consumerConfiguration.getMessageType(), Collections.emptyList()).stream().filter(message -> {
                    return ((Boolean) Optional.ofNullable(consumerConfiguration.getTypeFilter()).map(str -> {
                        return Boolean.valueOf(message.getPayload().getClass().getName().matches(str));
                    }).orElse(true)).booleanValue();
                }).collect(Collectors.toCollection(CopyOnWriteArrayList::new));
            });
            return messageBatch -> {
                consumer.accept(messageBatch);
                Collection collection = (Collection) messageBatch.getMessages().stream().map((v0) -> {
                    return v0.getMessageId();
                }).collect(Collectors.toSet());
                computeIfAbsent.removeIf(message -> {
                    return collection.contains(message.getMessageId());
                });
                TestFixture.this.checkConsumers();
            };
        }

        public Function<DeserializingMessage, Object> interceptHandling(Function<DeserializingMessage, Object> function, Handler<DeserializingMessage> handler, String str) {
            return deserializingMessage -> {
                try {
                    try {
                        Object apply = function.apply(deserializingMessage);
                        if ((deserializingMessage.getMessageType() == MessageType.COMMAND || deserializingMessage.getMessageType() == MessageType.QUERY) && ClientUtils.isLocalHandlerMethod(handler.getTarget().getClass(), handler.getMethod(deserializingMessage))) {
                            synchronized (TestFixture.this.consumers) {
                                TestFixture.this.consumers.entrySet().stream().filter(entry -> {
                                    return ((ConsumerConfiguration) entry.getKey()).getMessageType() == deserializingMessage.getMessageType();
                                }).forEach(entry2 -> {
                                    ((List) entry2.getValue()).removeIf(message -> {
                                        return message.getMessageId().equals(deserializingMessage.getSerializedObject().getMessageId());
                                    });
                                });
                            }
                            TestFixture.this.checkConsumers();
                        }
                        return apply;
                    } catch (Exception e) {
                        TestFixture.this.registerException(e);
                        throw e;
                    }
                } catch (Throwable th) {
                    if ((deserializingMessage.getMessageType() == MessageType.COMMAND || deserializingMessage.getMessageType() == MessageType.QUERY) && ClientUtils.isLocalHandlerMethod(handler.getTarget().getClass(), handler.getMethod(deserializingMessage))) {
                        synchronized (TestFixture.this.consumers) {
                            TestFixture.this.consumers.entrySet().stream().filter(entry3 -> {
                                return ((ConsumerConfiguration) entry3.getKey()).getMessageType() == deserializingMessage.getMessageType();
                            }).forEach(entry22 -> {
                                ((List) entry22.getValue()).removeIf(message -> {
                                    return message.getMessageId().equals(deserializingMessage.getSerializedObject().getMessageId());
                                });
                            });
                            TestFixture.this.checkConsumers();
                        }
                    }
                    throw th;
                }
            };
        }

        public void shutdown(Tracker tracker) {
            TestFixture.this.consumers.remove(tracker.getConfiguration());
            TestFixture.this.checkConsumers();
        }
    }

    public static TestFixture create(Object... objArr) {
        return create((FluxCapacitorBuilder) DefaultFluxCapacitor.builder(), objArr);
    }

    public static TestFixture create(FluxCapacitorBuilder fluxCapacitorBuilder, Object... objArr) {
        return create(fluxCapacitorBuilder, (Function<FluxCapacitor, List<?>>) fluxCapacitor -> {
            return Arrays.asList(objArr);
        });
    }

    public static TestFixture create(Function<FluxCapacitor, List<?>> function) {
        return create((FluxCapacitorBuilder) DefaultFluxCapacitor.builder(), function);
    }

    public static TestFixture create(FluxCapacitorBuilder fluxCapacitorBuilder, Function<FluxCapacitor, List<?>> function) {
        return new TestFixture(fluxCapacitorBuilder, function, InMemoryClient.newInstance(), true);
    }

    public static TestFixture createAsync(Object... objArr) {
        return createAsync((FluxCapacitorBuilder) DefaultFluxCapacitor.builder(), objArr);
    }

    public static TestFixture createAsync(FluxCapacitorBuilder fluxCapacitorBuilder, Object... objArr) {
        return createAsync(fluxCapacitorBuilder, (Function<FluxCapacitor, List<?>>) fluxCapacitor -> {
            return Arrays.asList(objArr);
        });
    }

    public static TestFixture createAsync(Function<FluxCapacitor, List<?>> function) {
        return createAsync((FluxCapacitorBuilder) DefaultFluxCapacitor.builder(), function);
    }

    public static TestFixture createAsync(FluxCapacitorBuilder fluxCapacitorBuilder, Function<FluxCapacitor, List<?>> function) {
        return new TestFixture(fluxCapacitorBuilder, function, InMemoryClient.newInstance(), false);
    }

    public static TestFixture createAsync(FluxCapacitorBuilder fluxCapacitorBuilder, Client client, Object... objArr) {
        return new TestFixture(fluxCapacitorBuilder, fluxCapacitor -> {
            return Arrays.asList(objArr);
        }, client, false);
    }

    protected TestFixture(FluxCapacitorBuilder fluxCapacitorBuilder, Function<FluxCapacitor, List<?>> function, Client client, boolean z) {
        this.synchronous = z;
        Optional map = Optional.ofNullable(UserProvider.defaultUserSupplier).map(TestUserProvider::new);
        fluxCapacitorBuilder = map.isPresent() ? fluxCapacitorBuilder.registerUserSupplier((UserProvider) map.get()) : fluxCapacitorBuilder;
        this.interceptor = new GivenWhenThenInterceptor();
        this.fluxCapacitor = new TestFluxCapacitor(fluxCapacitorBuilder.disableShutdownHook().addDispatchInterceptor(this.interceptor, new MessageType[0]).addBatchInterceptor(this.interceptor, new MessageType[0]).addHandlerInterceptor(this.interceptor, new MessageType[0]).build(new TestClient(client)));
        withClock(Clock.fixed(Instant.now(), ZoneId.systemDefault()));
        this.registration = registerHandlers(function.apply(this.fluxCapacitor));
    }

    public Registration registerHandlers(Object... objArr) {
        return registerHandlers(Arrays.asList(objArr));
    }

    public Registration registerHandlers(List<?> list) {
        if (list.isEmpty()) {
            return Registration.noOp();
        }
        list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getClass();
        }, Function.identity(), (obj, obj2) -> {
            log.warn("Handler of type {} is registered more than once. Please make sure this is intentional.", obj.getClass());
            return obj;
        }));
        if (!this.synchronous) {
            return getFluxCapacitor().registerHandlers(list);
        }
        FluxCapacitor fluxCapacitor = getFluxCapacitor();
        HandlerConfiguration defaultHandlerConfiguration = HandlerConfiguration.defaultHandlerConfiguration();
        Registration registration = (Registration) fluxCapacitor.apply(fluxCapacitor2 -> {
            return (Registration) list.stream().flatMap(obj3 -> {
                return Stream.of((Object[]) new Registration[]{fluxCapacitor.commandGateway().registerHandler(obj3, defaultHandlerConfiguration), fluxCapacitor.queryGateway().registerHandler(obj3, defaultHandlerConfiguration), fluxCapacitor.eventGateway().registerHandler(obj3, defaultHandlerConfiguration), fluxCapacitor.eventStore().registerHandler(obj3, defaultHandlerConfiguration), fluxCapacitor.errorGateway().registerHandler(obj3, defaultHandlerConfiguration)});
            }).reduce((v0, v1) -> {
                return v0.merge(v1);
            }).orElse(Registration.noOp());
        });
        if (fluxCapacitor.scheduler() instanceof DefaultScheduler) {
            DefaultScheduler scheduler = fluxCapacitor.scheduler();
            registration = registration.merge((Registration) fluxCapacitor.apply(fluxCapacitor3 -> {
                return (Registration) list.stream().flatMap(obj3 -> {
                    return Stream.of(scheduler.registerHandler(obj3, defaultHandlerConfiguration));
                }).reduce((v0, v1) -> {
                    return v0.merge(v1);
                }).orElse(Registration.noOp());
            }));
        } else {
            log.warn("Could not register local schedule handlers");
        }
        return registration;
    }

    @Override // io.fluxcapacitor.javaclient.test.Given
    public Given withClock(Clock clock) {
        return (Given) getFluxCapacitor().apply(fluxCapacitor -> {
            fluxCapacitor.withClock(clock);
            InMemorySchedulingClient schedulingClient = fluxCapacitor.client().getSchedulingClient();
            if (schedulingClient instanceof InMemorySchedulingClient) {
                schedulingClient.setClock(clock);
            } else {
                log.warn("Could not update clock of scheduling client. Timing tests may not work.");
            }
            return this;
        });
    }

    @Override // io.fluxcapacitor.javaclient.test.Given
    public Clock getClock() {
        return getFluxCapacitor().clock();
    }

    @Override // io.fluxcapacitor.javaclient.test.Given
    public IdentityProvider getIdentityProvider() {
        return getFluxCapacitor().identityProvider();
    }

    @Override // io.fluxcapacitor.javaclient.test.Given
    public Given withIdentityProvider(IdentityProvider identityProvider) {
        return (Given) getFluxCapacitor().apply(fluxCapacitor -> {
            fluxCapacitor.withIdentityProvider(identityProvider);
            return this;
        });
    }

    @Override // io.fluxcapacitor.javaclient.test.Given
    public When givenCommands(Object... objArr) {
        return given(fluxCapacitor -> {
            getDispatchResult(CompletableFuture.allOf((CompletableFuture[]) flatten(objArr).map(obj -> {
                return fluxCapacitor.commandGateway().send(obj);
            }).toArray(i -> {
                return new CompletableFuture[i];
            })));
        });
    }

    @Override // io.fluxcapacitor.javaclient.test.Given
    public When givenCommandsByUser(User user, Object... objArr) {
        return givenCommands(addUser(user, objArr));
    }

    @Override // io.fluxcapacitor.javaclient.test.Given
    public When givenDomainEvents(String str, Object... objArr) {
        return given(fluxCapacitor -> {
            publishDomainEvents(str, fluxCapacitor, objArr);
        });
    }

    @Override // io.fluxcapacitor.javaclient.test.Given
    public When givenEvents(Object... objArr) {
        return given(fluxCapacitor -> {
            flatten(objArr).forEach(obj -> {
                fluxCapacitor.eventGateway().publish(obj);
            });
        });
    }

    @Override // io.fluxcapacitor.javaclient.test.Given
    public When givenDocument(Object obj, String str, String str2, Instant instant) {
        return given(fluxCapacitor -> {
            fluxCapacitor.documentStore().index(obj, str, str2, instant);
        });
    }

    @Override // io.fluxcapacitor.javaclient.test.Given
    public When givenDocuments(String str, Object... objArr) {
        return given(fluxCapacitor -> {
            fluxCapacitor.documentStore().index(Arrays.asList(objArr), str);
        });
    }

    @Override // io.fluxcapacitor.javaclient.test.Given
    public When givenTimeAdvancesTo(Instant instant) {
        return given(fluxCapacitor -> {
            advanceTimeTo(instant);
        });
    }

    @Override // io.fluxcapacitor.javaclient.test.Given
    public When givenTimeElapses(Duration duration) {
        return given(fluxCapacitor -> {
            advanceTimeBy(duration);
        });
    }

    @Override // io.fluxcapacitor.javaclient.test.Given
    public When given(Consumer<FluxCapacitor> consumer) {
        return (When) this.fluxCapacitor.apply(fluxCapacitor -> {
            try {
                consumer.accept(fluxCapacitor);
                waitForConsumers();
                return this;
            } catch (Exception e) {
                throw new IllegalStateException("Failed to execute given", e);
            }
        });
    }

    @Override // io.fluxcapacitor.javaclient.test.When
    public Then whenCommand(Object obj) {
        return whenApplying(fluxCapacitor -> {
            return getDispatchResult(fluxCapacitor.commandGateway().send(this.interceptor.trace(obj)));
        });
    }

    @Override // io.fluxcapacitor.javaclient.test.When
    public Then whenCommandByUser(Object obj, User user) {
        return whenCommand(addUser(user, obj)[0]);
    }

    @Override // io.fluxcapacitor.javaclient.test.When
    public Then whenQuery(Object obj) {
        return whenApplying(fluxCapacitor -> {
            return getDispatchResult(fluxCapacitor.queryGateway().send(this.interceptor.trace(obj)));
        });
    }

    @Override // io.fluxcapacitor.javaclient.test.When
    public Then whenQueryByUser(Object obj, User user) {
        return whenQuery(addUser(user, obj)[0]);
    }

    @Override // io.fluxcapacitor.javaclient.test.When
    public Then whenEvent(Object obj) {
        return when(fluxCapacitor -> {
            fluxCapacitor.eventGateway().publish(this.interceptor.trace(obj), Guarantee.NONE);
        });
    }

    @Override // io.fluxcapacitor.javaclient.test.When
    public Then whenDomainEvents(String str, Object... objArr) {
        return when(fluxCapacitor -> {
            publishDomainEvents(str, fluxCapacitor, objArr);
        });
    }

    @Override // io.fluxcapacitor.javaclient.test.When
    public Then whenSearching(String str, UnaryOperator<Search> unaryOperator) {
        return whenApplying(fluxCapacitor -> {
            return ((Search) unaryOperator.apply(fluxCapacitor.documentStore().search(new String[]{str}))).fetchAll();
        });
    }

    @Override // io.fluxcapacitor.javaclient.test.When
    public Then whenScheduleExpires(Object obj) {
        return when(fluxCapacitor -> {
            fluxCapacitor.scheduler().schedule(this.interceptor.trace(obj), getClock().instant());
        });
    }

    @Override // io.fluxcapacitor.javaclient.test.When
    public Then whenTimeElapses(Duration duration) {
        return when(fluxCapacitor -> {
            advanceTimeBy(duration);
        });
    }

    @Override // io.fluxcapacitor.javaclient.test.When
    public Then whenTimeAdvancesTo(Instant instant) {
        return when(fluxCapacitor -> {
            advanceTimeTo(instant);
        });
    }

    @Override // io.fluxcapacitor.javaclient.test.When
    public Then when(Consumer<FluxCapacitor> consumer) {
        return whenApplying(fluxCapacitor -> {
            consumer.accept(fluxCapacitor);
            return null;
        });
    }

    @Override // io.fluxcapacitor.javaclient.test.When
    public Then whenApplying(Function<FluxCapacitor, ?> function) {
        return (Then) this.fluxCapacitor.apply(fluxCapacitor -> {
            Object obj;
            try {
                handleExpiredSchedulesLocally();
                waitForConsumers();
                resetMocks();
                this.collectingResults = true;
                try {
                    obj = function.apply(fluxCapacitor);
                } catch (Exception e) {
                    obj = e;
                }
                waitForConsumers();
                Then resultValidator = getResultValidator(obj, this.commands, this.events, this.schedules, this.exceptions);
                handleExpiredSchedulesLocally();
                this.registration.cancel();
                return resultValidator;
            } catch (Throwable th) {
                handleExpiredSchedulesLocally();
                this.registration.cancel();
                throw th;
            }
        });
    }

    protected Then getResultValidator(Object obj, List<Message> list, List<Message> list2, List<Schedule> list3, List<Throwable> list4) {
        return new ResultValidator(getFluxCapacitor(), obj, list2, list, list3, list4);
    }

    protected void publishDomainEvents(String str, FluxCapacitor fluxCapacitor, Object[] objArr) {
        List list = (List) flatten(objArr).map(obj -> {
            Message message = obj instanceof Message ? (Message) obj : new Message(obj);
            return message.withMetadata(message.getMetadata().with("$aggregateId", str));
        }).collect(Collectors.toList());
        if (!list.stream().anyMatch(message -> {
            return message.getPayload() instanceof Data;
        })) {
            fluxCapacitor.eventStore().storeEvents(str, str, list.size() - 1, list);
            return;
        }
        for (int i = 0; i < list.size(); i++) {
            Message message2 = (Message) list.get(i);
            if (message2.getPayload() instanceof Data) {
                fluxCapacitor.client().getEventStoreClient().storeEvents(str, "test", i, Collections.singletonList(new SerializedMessage(fluxCapacitor.serializer().serialize((Data) message2.getPayload()), message2.getMetadata(), message2.getMessageId(), Long.valueOf(message2.getTimestamp().toEpochMilli()))), false);
            } else {
                fluxCapacitor.eventStore().storeEvents(str, str, i, new Object[]{message2});
            }
        }
    }

    protected void handleExpiredSchedulesLocally() {
        if (this.synchronous) {
            InMemorySchedulingClient schedulingClient = getFluxCapacitor().client().getSchedulingClient();
            if (schedulingClient instanceof InMemorySchedulingClient) {
                List removeExpiredSchedules = schedulingClient.removeExpiredSchedules(getFluxCapacitor().serializer());
                if (getFluxCapacitor().scheduler() instanceof DefaultScheduler) {
                    DefaultScheduler scheduler = getFluxCapacitor().scheduler();
                    removeExpiredSchedules.forEach(schedule -> {
                        scheduler.handleLocally(schedule, schedule.serialize(getFluxCapacitor().serializer()));
                    });
                }
            }
        }
    }

    protected void waitForConsumers() {
        if (this.synchronous) {
            return;
        }
        synchronized (this.consumers) {
            if (!checkConsumers()) {
                try {
                    this.consumers.wait(this.consumerTimeout.toMillis());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            if (!checkConsumers()) {
                log.warn("Some consumers in the test fixture did not finish processing all messages. This may cause your test to fail. Waiting consumers: {}", this.consumers.entrySet().stream().filter(entry -> {
                    return !((List) entry.getValue()).isEmpty();
                }).map(entry2 -> {
                    return ((ConsumerConfiguration) entry2.getKey()).getName() + " : " + ((String) ((List) entry2.getValue()).stream().map(message -> {
                        return message.getPayload() == null ? "Void" : message.getPayload().getClass().getSimpleName();
                    }).collect(Collectors.joining(", ")));
                }).collect(Collectors.toList()));
            }
        }
    }

    protected void resetMocks() {
        Client client = this.fluxCapacitor.client();
        Mockito.reset(Stream.concat(Stream.of((Object[]) new AutoCloseable[]{client.getEventStoreClient(), client.getSchedulingClient(), client.getKeyValueClient()}), Arrays.stream(MessageType.values()).flatMap(messageType -> {
            return Stream.of((Object[]) new AutoCloseable[]{client.getGatewayClient(messageType), client.getTrackingClient(messageType)}).filter((v0) -> {
                return Objects.nonNull(v0);
            });
        })).distinct().toArray());
    }

    protected void advanceTimeBy(Duration duration) {
        advanceTimeTo(getClock().instant().plus((TemporalAmount) duration));
    }

    protected void advanceTimeTo(Instant instant) {
        withClock(Clock.fixed(instant, ZoneId.systemDefault()));
    }

    protected void registerCommand(Message message) {
        this.commands.add(message);
    }

    protected void registerEvent(Message message) {
        this.events.add(message);
    }

    protected void registerSchedule(Schedule schedule) {
        this.schedules.add(schedule);
    }

    protected void registerException(Throwable th) {
        this.exceptions.add(th);
    }

    protected Object getDispatchResult(CompletableFuture<?> completableFuture) {
        try {
            return this.synchronous ? completableFuture.get(1L, TimeUnit.MILLISECONDS) : completableFuture.get(this.resultTimeout.toMillis(), TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            throw e.getCause();
        } catch (TimeoutException e2) {
            throw new TimeoutException("Test fixture did not receive a dispatch result in time. Perhaps some messages did not have handlers?");
        }
    }

    protected Stream<Object> flatten(Object... objArr) {
        return Arrays.stream(objArr).flatMap(obj -> {
            return obj instanceof Collection ? ((Collection) obj).stream() : obj.getClass().isArray() ? Arrays.stream((Object[]) obj) : Stream.of(obj);
        });
    }

    protected Object[] addUser(User user, Object... objArr) {
        UserProvider userProvider = this.fluxCapacitor.userProvider();
        if (userProvider == null) {
            throw new IllegalStateException("UserProvider has not been configured");
        }
        return flatten(objArr).map(obj -> {
            return obj instanceof Message ? (Message) obj : new Message(obj);
        }).map(message -> {
            return message.withMetadata(userProvider.addToMetadata(message.getMetadata(), user));
        }).toArray();
    }

    protected boolean checkConsumers() {
        if (this.synchronous) {
            return true;
        }
        synchronized (this.consumers) {
            if (!this.consumers.values().stream().allMatch(list -> {
                return list.stream().noneMatch(message -> {
                    return ((message instanceof Schedule) && ((Schedule) message).getDeadline().isAfter(getClock().instant())) ? false : true;
                });
            })) {
                return false;
            }
            this.consumers.notifyAll();
            return true;
        }
    }

    public FluxCapacitor getFluxCapacitor() {
        return this.fluxCapacitor;
    }

    public Duration resultTimeout() {
        return this.resultTimeout;
    }

    public TestFixture resultTimeout(Duration duration) {
        this.resultTimeout = duration;
        return this;
    }

    public Duration consumerTimeout() {
        return this.consumerTimeout;
    }

    public TestFixture consumerTimeout(Duration duration) {
        this.consumerTimeout = duration;
        return this;
    }
}
