package org.axonframework.eventhandling.pooled;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.axonframework.common.ReflectionUtils;
import org.axonframework.common.stream.BlockingStream;
import org.axonframework.common.transaction.NoTransactionManager;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.GenericTrackedEventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.ReplayToken;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.utils.AssertUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/axonframework/eventhandling/pooled/CoordinatorTest.class */
class CoordinatorTest {
    private static final String PROCESSOR_NAME = "test";
    private Coordinator testSubject;
    private final Segment SEGMENT_ZERO = Segment.computeSegment(0, new int[0]);
    private final int SEGMENT_ID = 0;
    private final int[] SEGMENT_IDS = {0};
    private final Segment SEGMENT_ONE = Segment.computeSegment(0, this.SEGMENT_IDS);
    private final int[] EMPTY_SEGMENT_IDS = new int[0];
    private final TokenStore tokenStore = (TokenStore) Mockito.mock(TokenStore.class);
    private final ScheduledThreadPoolExecutor executorService = (ScheduledThreadPoolExecutor) Mockito.mock(ScheduledThreadPoolExecutor.class);
    private final StreamableMessageSource<TrackedEventMessage<?>> messageSource = (StreamableMessageSource) Mockito.mock(StreamableMessageSource.class);
    private final WorkPackage workPackage = (WorkPackage) Mockito.mock(WorkPackage.class);

    CoordinatorTest() {
    }

    @BeforeEach
    void setUp() {
        this.testSubject = Coordinator.builder().name(PROCESSOR_NAME).messageSource(this.messageSource).tokenStore(this.tokenStore).transactionManager(NoTransactionManager.instance()).executorService(this.executorService).workPackageFactory((segment, trackingToken) -> {
            return this.workPackage;
        }).initialToken(streamableMessageSource -> {
            return ReplayToken.createReplayToken(streamableMessageSource.createHeadToken());
        }).eventFilter(trackedEventMessage -> {
            return true;
        }).maxClaimedSegments(this.SEGMENT_IDS.length).build();
    }

    @Test
    void ifCoordinationTaskRescheduledAfterTokenReleaseClaimFails() {
        RuntimeException runtimeException = new RuntimeException("Some exception during event stream open");
        RuntimeException runtimeException2 = new RuntimeException("Some exception during release claim");
        GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(0L);
        ((TokenStore) Mockito.doReturn(this.SEGMENT_IDS).when(this.tokenStore)).fetchSegments(PROCESSOR_NAME);
        ((TokenStore) Mockito.doReturn(globalSequenceTrackingToken).when(this.tokenStore)).fetchToken((String) Mockito.eq(PROCESSOR_NAME), Mockito.anyInt());
        ((TokenStore) Mockito.doThrow(new Throwable[]{runtimeException2}).when(this.tokenStore)).releaseClaim((String) Mockito.eq(PROCESSOR_NAME), Mockito.anyInt());
        ((StreamableMessageSource) Mockito.doThrow(new Throwable[]{runtimeException}).when(this.messageSource)).openStream((TrackingToken) Mockito.any());
        ((WorkPackage) Mockito.doReturn(CompletableFuture.completedFuture(runtimeException)).when(this.workPackage)).abort((Exception) Mockito.any());
        ((WorkPackage) Mockito.doReturn(this.SEGMENT_ZERO).when(this.workPackage)).segment();
        ((ScheduledThreadPoolExecutor) Mockito.doAnswer(runTaskSync()).when(this.executorService)).submit((Runnable) Mockito.any(Runnable.class));
        this.testSubject.start();
        ((ScheduledThreadPoolExecutor) Mockito.verify(this.executorService, Mockito.times(1))).schedule((Runnable) Mockito.any(Runnable.class), Mockito.anyLong(), (TimeUnit) Mockito.any(TimeUnit.class));
        ((TokenStore) Mockito.verify(this.tokenStore, Mockito.times(0))).initializeTokenSegments(Mockito.anyString(), Mockito.anyInt(), (TrackingToken) Mockito.any(TrackingToken.class));
    }

    @Test
    void ifCoordinationTaskInitializesTokenStoreWhenNeeded() {
        GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(0L);
        ((TokenStore) Mockito.doReturn(this.EMPTY_SEGMENT_IDS).when(this.tokenStore)).fetchSegments(PROCESSOR_NAME);
        ((TokenStore) Mockito.doReturn(globalSequenceTrackingToken).when(this.tokenStore)).fetchToken((String) Mockito.eq(PROCESSOR_NAME), Mockito.anyInt());
        ((WorkPackage) Mockito.doReturn(this.SEGMENT_ZERO).when(this.workPackage)).segment();
        ((ScheduledThreadPoolExecutor) Mockito.doAnswer(runTaskSync()).when(this.executorService)).submit((Runnable) Mockito.any(Runnable.class));
        this.testSubject.start();
        ((ScheduledThreadPoolExecutor) Mockito.verify(this.executorService, Mockito.times(1))).schedule((Runnable) Mockito.any(Runnable.class), Mockito.anyLong(), (TimeUnit) Mockito.any(TimeUnit.class));
        ((TokenStore) Mockito.verify(this.tokenStore, Mockito.times(1))).initializeTokenSegments(Mockito.anyString(), Mockito.anyInt(), (TrackingToken) Mockito.isNull());
    }

    @Test
    void ifCoordinationTaskSchedulesEventsWithTheSameTokenTogether() throws InterruptedException {
        GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(0L);
        GenericTrackedEventMessage genericTrackedEventMessage = new GenericTrackedEventMessage(globalSequenceTrackingToken, GenericEventMessage.asEventMessage("this-event"));
        GenericTrackedEventMessage genericTrackedEventMessage2 = new GenericTrackedEventMessage(globalSequenceTrackingToken, GenericEventMessage.asEventMessage("that-event"));
        ArrayList arrayList = new ArrayList();
        arrayList.add(genericTrackedEventMessage);
        arrayList.add(genericTrackedEventMessage2);
        Mockito.when(Boolean.valueOf(this.workPackage.hasRemainingCapacity())).thenReturn(true).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.workPackage.isAbortTriggered())).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.workPackage.scheduleEvents(arrayList))).thenReturn(true);
        BlockingStream blockingStream = (BlockingStream) Mockito.mock(BlockingStream.class);
        Mockito.when(Boolean.valueOf(blockingStream.setOnAvailableCallback((Runnable) Mockito.any()))).thenReturn(false);
        Mockito.when(Boolean.valueOf(blockingStream.hasNextAvailable())).thenReturn(true).thenReturn(true).thenReturn(false);
        Mockito.when(blockingStream.nextAvailable()).thenReturn(genericTrackedEventMessage).thenReturn(genericTrackedEventMessage2);
        Mockito.when(blockingStream.peek()).thenReturn(Optional.of(genericTrackedEventMessage2)).thenReturn(Optional.of(genericTrackedEventMessage2)).thenReturn(Optional.empty());
        Mockito.when(this.executorService.submit((Runnable) Mockito.any(Runnable.class))).thenAnswer(runTaskAsync());
        Mockito.when(this.tokenStore.fetchSegments(PROCESSOR_NAME)).thenReturn(this.SEGMENT_IDS);
        Mockito.when(this.tokenStore.fetchAvailableSegments(PROCESSOR_NAME)).thenReturn(Collections.singletonList(this.SEGMENT_ONE));
        Mockito.when(this.tokenStore.fetchToken(PROCESSOR_NAME, this.SEGMENT_ONE)).thenReturn(globalSequenceTrackingToken);
        Mockito.when(this.messageSource.openStream(globalSequenceTrackingToken)).thenReturn(blockingStream);
        this.testSubject.start();
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> {
            ((TokenStore) Mockito.verify(this.tokenStore)).fetchToken(PROCESSOR_NAME, this.SEGMENT_ONE);
        });
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> {
            ((StreamableMessageSource) Mockito.verify(this.messageSource)).openStream(globalSequenceTrackingToken);
        });
        ArgumentCaptor forClass = ArgumentCaptor.forClass(List.class);
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> {
            ((WorkPackage) Mockito.verify(this.workPackage)).scheduleEvents((List) forClass.capture());
        });
        List list = (List) forClass.getValue();
        Assertions.assertEquals(2, list.size());
        Assertions.assertTrue(list.contains(genericTrackedEventMessage));
        Assertions.assertTrue(list.contains(genericTrackedEventMessage2));
        ((WorkPackage) Mockito.verify(this.workPackage, Mockito.times(0))).scheduleEvent((TrackedEventMessage) Mockito.any());
    }

    @Test
    void coordinatorShouldNotTryToOpenStreamWithNoToken() throws NoSuchFieldException {
        GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(0L);
        ((TokenStore) Mockito.doReturn(this.SEGMENT_IDS).when(this.tokenStore)).fetchSegments(PROCESSOR_NAME);
        ((TokenStore) Mockito.doReturn(globalSequenceTrackingToken).when(this.tokenStore)).fetchToken((String) Mockito.eq(PROCESSOR_NAME), Mockito.anyInt());
        ((WorkPackage) Mockito.doReturn(this.SEGMENT_ZERO).when(this.workPackage)).segment();
        ((ScheduledThreadPoolExecutor) Mockito.doAnswer(runTaskSync()).when(this.executorService)).submit((Runnable) Mockito.any(Runnable.class));
        ((Map) ReflectionUtils.getFieldValue(Coordinator.class.getDeclaredField("workPackages"), this.testSubject)).put(0, this.workPackage);
        ((WorkPackage) Mockito.doReturn(new CompletableFuture()).when(this.workPackage)).abort((Exception) Mockito.any());
        this.testSubject.start();
        ((StreamableMessageSource) Mockito.verify(this.messageSource, Mockito.never())).openStream((TrackingToken) Mockito.any(TrackingToken.class));
    }

    private Answer<Future<Void>> runTaskSync() {
        return invocationOnMock -> {
            ((Runnable) invocationOnMock.getArgument(0)).run();
            return CompletableFuture.completedFuture(null);
        };
    }

    private Answer<Future<Void>> runTaskAsync() {
        return invocationOnMock -> {
            return CompletableFuture.runAsync((Runnable) invocationOnMock.getArgument(0));
        };
    }
}
