package org.axonframework.extensions.mongo.eventsourcing.tokenstore;

import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Filters;
import com.thoughtworks.xstream.XStream;
import java.time.Duration;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.tokenstore.ConfigToken;
import org.axonframework.eventhandling.tokenstore.GenericTokenEntry;
import org.axonframework.eventhandling.tokenstore.UnableToClaimTokenException;
import org.axonframework.eventhandling.tokenstore.UnableToInitializeTokenException;
import org.axonframework.extensions.mongo.MongoTemplate;
import org.axonframework.extensions.mongo.eventsourcing.tokenstore.MongoTokenStore;
import org.axonframework.extensions.mongo.util.MongoTemplateFactory;
import org.axonframework.extensions.mongo.utils.TestSerializer;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.json.JacksonSerializer;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
/* loaded from: input_file:org/axonframework/extensions/mongo/eventsourcing/tokenstore/MongoTokenStoreTest.class */
class MongoTokenStoreTest {

    @Container
    private static final MongoDBContainer MONGO_CONTAINER = new MongoDBContainer("mongo:5");
    private MongoTokenStore tokenStore;
    private MongoTokenStore tokenStoreDifferentOwner;
    private MongoTemplate mongoTemplate;
    private MongoCollection<Document> trackingTokensCollection;
    private Serializer serializer;
    private final TemporalAmount claimTimeout = Duration.ofSeconds(5);
    private final Class<byte[]> contentType = byte[].class;
    private final boolean ensureIndexes = true;
    private final String testProcessorName = "testProcessorName";
    private final int testSegment = 9;
    private final int testSegmentCount = 10;
    private final String testOwner = "testOwner";

    MongoTokenStoreTest() {
    }

    @BeforeEach
    void setUp() {
        this.mongoTemplate = MongoTemplateFactory.build(MONGO_CONTAINER.getHost(), MONGO_CONTAINER.getFirstMappedPort().intValue());
        this.trackingTokensCollection = this.mongoTemplate.trackingTokensCollection();
        this.trackingTokensCollection.drop();
        this.serializer = TestSerializer.xStreamSerializer();
        MongoTokenStore.Builder ensureIndexes = MongoTokenStore.builder().mongoTemplate(this.mongoTemplate).serializer(this.serializer).claimTimeout(this.claimTimeout).contentType(this.contentType).ensureIndexes(true);
        this.tokenStore = ensureIndexes.nodeId("testOwner").build();
        this.tokenStoreDifferentOwner = ensureIndexes.nodeId("anotherOwner").build();
    }

    @AfterEach
    void tearDown() {
        this.trackingTokensCollection.drop();
    }

    @Test
    void testClaimAndUpdateToken() {
        this.tokenStore.initializeTokenSegments("testProcessorName", 10);
        Assertions.assertNull(this.tokenStore.fetchToken("testProcessorName", 9));
        GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(1L);
        this.tokenStore.storeToken(globalSequenceTrackingToken, "testProcessorName", 9);
        Assertions.assertEquals(globalSequenceTrackingToken, this.tokenStore.fetchToken("testProcessorName", 9));
    }

    @Test
    void testInitializeTokens() {
        this.tokenStore.initializeTokenSegments("test1", 7);
        int[] fetchSegments = this.tokenStore.fetchSegments("test1");
        Arrays.sort(fetchSegments);
        Assertions.assertArrayEquals(new int[]{0, 1, 2, 3, 4, 5, 6}, fetchSegments);
    }

    @Test
    void testInitializeTokensAtGivenPosition() {
        this.tokenStore.initializeTokenSegments("test1", 7, new GlobalSequenceTrackingToken(10L));
        int[] fetchSegments = this.tokenStore.fetchSegments("test1");
        Arrays.sort(fetchSegments);
        Assertions.assertArrayEquals(new int[]{0, 1, 2, 3, 4, 5, 6}, fetchSegments);
        for (int i : fetchSegments) {
            Assertions.assertEquals(new GlobalSequenceTrackingToken(10L), this.tokenStore.fetchToken("test1", i));
        }
    }

    @Test
    void testInitializeTokensWhileAlreadyPresent() {
        this.tokenStore.initializeTokenSegments("testProcessorName", 10);
        Assertions.assertThrows(UnableToClaimTokenException.class, () -> {
            this.tokenStore.initializeTokenSegments("testProcessorName", 10);
        });
    }

    @Test
    void testAttemptToClaimAlreadyClaimedToken() {
        this.tokenStore.initializeTokenSegments("testProcessorName", 10);
        Assertions.assertNull(this.tokenStore.fetchToken("testProcessorName", 9));
        GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(1L);
        this.tokenStore.storeToken(globalSequenceTrackingToken, "testProcessorName", 9);
        Assertions.assertThrows(UnableToClaimTokenException.class, () -> {
            this.tokenStoreDifferentOwner.storeToken(globalSequenceTrackingToken, "testProcessorName", 9);
        });
    }

    @Test
    void testAttemptToExtendClaimOnAlreadyClaimedToken() {
        this.tokenStore.initializeTokenSegments("testProcessorName", 10);
        Assertions.assertNull(this.tokenStore.fetchToken("testProcessorName", 9));
        Assertions.assertThrows(UnableToClaimTokenException.class, () -> {
            this.tokenStoreDifferentOwner.extendClaim("testProcessorName", 9);
        });
    }

    @Test
    void testClaimAndExtend() {
        GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(1L);
        this.tokenStore.initializeSegment(globalSequenceTrackingToken, "testProcessorName", 9);
        this.tokenStore.storeToken(globalSequenceTrackingToken, "testProcessorName", 9);
        try {
            this.tokenStoreDifferentOwner.fetchToken("testProcessorName", 9);
            Assertions.fail("Expected UnableToClaimTokenException");
        } catch (UnableToClaimTokenException e) {
        }
        this.tokenStore.extendClaim("testProcessorName", 9);
    }

    @Test
    void testReleaseClaimAndExtendClaim() {
        this.tokenStore.initializeTokenSegments("testProcessorName", 10);
        this.tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "testProcessorName", 9);
        try {
            this.tokenStoreDifferentOwner.fetchToken("testProcessorName", 9);
            Assertions.fail("Expected UnableToClaimTokenException");
        } catch (UnableToClaimTokenException e) {
        }
        this.tokenStore.releaseClaim("testProcessorName", 9);
        Assertions.assertThrows(UnableToClaimTokenException.class, () -> {
            this.tokenStoreDifferentOwner.extendClaim("testProcessorName", 9);
        });
    }

    @Test
    void testFetchSegments() {
        this.tokenStore.initializeTokenSegments("processor1", 3);
        this.tokenStore.initializeTokenSegments("processor2", 1);
        Assertions.assertArrayEquals(new int[]{0, 1, 2}, this.tokenStore.fetchSegments("processor1"));
        Assertions.assertArrayEquals(new int[]{0}, this.tokenStore.fetchSegments("processor2"));
        Assertions.assertArrayEquals(new int[0], this.tokenStore.fetchSegments("processor3"));
    }

    @Test
    void testFetchAvailableSegments() {
        this.tokenStore.initializeTokenSegments("processor1", 3);
        this.tokenStoreDifferentOwner.fetchToken("processor1", 0);
        List fetchAvailableSegments = this.tokenStore.fetchAvailableSegments("processor1");
        Assertions.assertEquals(2, fetchAvailableSegments.size());
        Assertions.assertEquals(3, this.tokenStoreDifferentOwner.fetchAvailableSegments("processor1").size());
        Assertions.assertEquals(Segment.computeSegment(1, new int[]{0, 1, 2}), fetchAvailableSegments.get(0));
        Assertions.assertEquals(Segment.computeSegment(2, new int[]{0, 1, 2}), fetchAvailableSegments.get(1));
    }

    @Test
    void testConcurrentAccess() throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            int i2 = i;
            arrayList.add(newFixedThreadPool.submit(() -> {
                try {
                    MongoTokenStore build = MongoTokenStore.builder().mongoTemplate(this.mongoTemplate).serializer(this.serializer).claimTimeout(this.claimTimeout).nodeId(String.valueOf(i2)).contentType(this.contentType).ensureIndexes(true).build();
                    GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(i2);
                    build.initializeSegment(globalSequenceTrackingToken, "testProcessorName", 9);
                    build.storeToken(globalSequenceTrackingToken, "testProcessorName", 9);
                    return Integer.valueOf(i2);
                } catch (UnableToClaimTokenException e) {
                    return null;
                }
            }));
        }
        newFixedThreadPool.shutdown();
        Assertions.assertTrue(newFixedThreadPool.awaitTermination(10L, TimeUnit.SECONDS));
        List list = (List) arrayList.stream().filter(future -> {
            try {
                return future.get() != null;
            } catch (InterruptedException | ExecutionException e) {
                return false;
            }
        }).collect(Collectors.toList());
        Assertions.assertEquals(1, list.size());
        Assertions.assertEquals(new GlobalSequenceTrackingToken(r0.intValue()), MongoTokenStore.builder().mongoTemplate(this.mongoTemplate).serializer(this.serializer).claimTimeout(this.claimTimeout).nodeId(String.valueOf((Integer) ((Future) list.get(0)).get())).contentType(this.contentType).ensureIndexes(true).build().fetchToken("testProcessorName", 9));
    }

    @Test
    void testStoreAndFetchTokenResultsInTheSameTokenWithXStreamSerializer() {
        MongoTokenStore build = MongoTokenStore.builder().serializer(XStreamSerializer.builder().xStream(new XStream()).build()).mongoTemplate(this.mongoTemplate).claimTimeout(this.claimTimeout).contentType(this.contentType).nodeId("testOwner").ensureIndexes(true).build();
        GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(100L);
        build.initializeSegment(globalSequenceTrackingToken, "processorName", 0);
        build.storeToken(globalSequenceTrackingToken, "processorName", 0);
        Assertions.assertEquals(globalSequenceTrackingToken, build.fetchToken("processorName", 0));
    }

    @Test
    void testStoreAndFetchTokenResultsInTheSameTokenWithJacksonSerializer() {
        MongoTokenStore build = MongoTokenStore.builder().serializer(JacksonSerializer.builder().build()).mongoTemplate(this.mongoTemplate).claimTimeout(this.claimTimeout).contentType(this.contentType).nodeId("testOwner").ensureIndexes(true).build();
        GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(100L);
        build.initializeSegment(globalSequenceTrackingToken, "processorName", 0);
        build.storeToken(globalSequenceTrackingToken, "processorName", 0);
        Assertions.assertEquals(globalSequenceTrackingToken, build.fetchToken("processorName", 0));
    }

    @Test
    void testRequiresExplicitSegmentInitializationReturnsTrue() {
        Assertions.assertTrue(this.tokenStore.requiresExplicitSegmentInitialization());
    }

    @Test
    void testInitializeSegmentForNullTokenOnlyCreatesSegments() {
        this.tokenStore.initializeSegment((TrackingToken) null, "testProcessorName", 9);
        int[] fetchSegments = this.tokenStore.fetchSegments("testProcessorName");
        Arrays.sort(fetchSegments);
        Assertions.assertArrayEquals(new int[]{9}, fetchSegments);
        Assertions.assertNull(this.tokenStore.fetchToken("testProcessorName", 9));
    }

    @Test
    void testInitializeSegmentInsertsTheProvidedTokenAndInitializesTheGivenSegment() {
        GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(100L);
        this.tokenStore.initializeSegment(globalSequenceTrackingToken, "testProcessorName", 9);
        int[] fetchSegments = this.tokenStore.fetchSegments("testProcessorName");
        Arrays.sort(fetchSegments);
        Assertions.assertArrayEquals(new int[]{9}, fetchSegments);
        Assertions.assertEquals(globalSequenceTrackingToken, this.tokenStore.fetchToken("testProcessorName", 9));
    }

    @Test
    void testInitializeSegmentThrowsUnableToInitializeTokenExceptionForDuplicateKey() {
        GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(100L);
        this.tokenStore.initializeSegment(globalSequenceTrackingToken, "testProcessorName", 9);
        Assertions.assertThrows(UnableToInitializeTokenException.class, () -> {
            this.tokenStore.initializeSegment(globalSequenceTrackingToken, "testProcessorName", 9);
        });
    }

    @Test
    void testDeleteTokenRemovesTheSpecifiedToken() {
        this.tokenStore.initializeSegment((TrackingToken) null, "testProcessorName", 9);
        this.tokenStore.fetchToken("testProcessorName", 9);
        Assertions.assertTrue(this.mongoTemplate.trackingTokensCollection().find(Filters.and(new Bson[]{Filters.eq("processorName", "testProcessorName"), Filters.eq("segment", 9)})).iterator().hasNext());
        this.tokenStore.deleteToken("testProcessorName", 9);
        Assertions.assertFalse(this.mongoTemplate.trackingTokensCollection().find(Filters.and(new Bson[]{Filters.eq("processorName", "testProcessorName"), Filters.eq("segment", 9)})).iterator().hasNext());
    }

    @Test
    void testDeleteTokenThrowsUnableToClaimTokenExceptionIfTheCallingProcessDoesNotOwnTheToken() {
        this.tokenStore.initializeSegment((TrackingToken) null, "testProcessorName", 9);
        Assertions.assertThrows(UnableToClaimTokenException.class, () -> {
            this.tokenStore.deleteToken("testProcessorName", 9);
        });
    }

    @Test
    void testEnsureIndexCreation() {
        boolean z = false;
        MongoCursor it = this.trackingTokensCollection.listIndexes().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            } else if (Objects.equals("processorName_1_segment_1", ((Document) it.next()).getString("name"))) {
                z = true;
                break;
            }
        }
        Assertions.assertTrue(z);
    }

    @Test
    void testRetrieveStorageIdentifierCreatesAndReturnsConfigTokenIdentifier() {
        Bson and = Filters.and(new Bson[]{Filters.eq("processorName", "__config"), Filters.eq("segment", 0)});
        Assertions.assertNull(this.trackingTokensCollection.find(and).first());
        Optional retrieveStorageIdentifier = this.tokenStore.retrieveStorageIdentifier();
        Assertions.assertTrue(retrieveStorageIdentifier.isPresent());
        Assertions.assertFalse(((String) retrieveStorageIdentifier.get()).isEmpty());
        Assertions.assertNotNull(this.trackingTokensCollection.find(and).first());
    }

    @Test
    void testRetrieveStorageIdentifierReturnsExistingConfigTokenIdentifier() {
        String uuid = UUID.randomUUID().toString();
        GenericTokenEntry genericTokenEntry = new GenericTokenEntry(new ConfigToken(Collections.singletonMap("id", uuid)), this.serializer, this.contentType, "__config", 0);
        this.mongoTemplate.trackingTokensCollection().insertOne(new Document("processorName", genericTokenEntry.getProcessorName()).append("segment", Integer.valueOf(genericTokenEntry.getSegment())).append("owner", genericTokenEntry.getOwner()).append("timestamp", Long.valueOf(genericTokenEntry.timestamp().toEpochMilli())).append("token", genericTokenEntry.getSerializedToken().getData()).append("tokenType", genericTokenEntry.getSerializedToken().getType().getName()));
        Optional retrieveStorageIdentifier = this.tokenStore.retrieveStorageIdentifier();
        Assertions.assertTrue(retrieveStorageIdentifier.isPresent());
        Assertions.assertEquals(uuid, (String) retrieveStorageIdentifier.get());
    }

    @Test
    void storeTokenConcurrently() throws InterruptedException {
        this.tokenStore.initializeSegment((TrackingToken) null, "testProcessorName", 9);
        GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(42L);
        testConcurrency(() -> {
            this.tokenStore.storeToken(globalSequenceTrackingToken, "testProcessorName", 9);
            return true;
        }, () -> {
            this.tokenStoreDifferentOwner.storeToken(globalSequenceTrackingToken, "testProcessorName", 9);
            return true;
        });
    }

    @Test
    void deleteTokenConcurrently() throws InterruptedException {
        this.tokenStore.initializeSegment((TrackingToken) null, "testProcessorName", 9);
        this.tokenStore.storeToken(new GlobalSequenceTrackingToken(42L), "testProcessorName", 9);
        testConcurrency(() -> {
            this.tokenStore.deleteToken("testProcessorName", 9);
            return true;
        }, () -> {
            this.tokenStoreDifferentOwner.deleteToken("testProcessorName", 9);
            return true;
        });
    }

    @Test
    void fetchTokenConcurrently() throws InterruptedException {
        this.tokenStore.initializeSegment((TrackingToken) null, "testProcessorName", 9);
        GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(42L);
        this.tokenStore.storeToken(globalSequenceTrackingToken, "testProcessorName", 9);
        this.tokenStore.releaseClaim("testProcessorName", 9);
        Assertions.assertEquals(globalSequenceTrackingToken, (TrackingToken) testConcurrency(() -> {
            return this.tokenStore.fetchToken("testProcessorName", 9);
        }, () -> {
            return this.tokenStoreDifferentOwner.fetchToken("testProcessorName", 9);
        }));
    }

    @Test
    void initializeSegmentWithTokenConcurrently() throws InterruptedException {
        GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(42L);
        testConcurrency(() -> {
            this.tokenStore.initializeSegment(globalSequenceTrackingToken, "testProcessorName", 9);
            return true;
        }, () -> {
            this.tokenStoreDifferentOwner.initializeSegment(globalSequenceTrackingToken, "testProcessorName", 9);
            return true;
        });
    }

    @Test
    void settingNullTransactionManagerShouldThrow() {
        MongoTokenStore.Builder builder = MongoTokenStore.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> {
            builder.transactionManager((TransactionManager) null);
        });
    }

    private <T> T testConcurrency(Supplier<T> supplier, Supplier<T> supplier2) throws InterruptedException {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        AtomicReference atomicReference = new AtomicReference(null);
        AtomicReference atomicReference2 = new AtomicReference(null);
        newFixedThreadPool.execute(() -> {
            atomicReference.set(supplier.get());
        });
        newFixedThreadPool.execute(() -> {
            atomicReference2.set(supplier2.get());
        });
        newFixedThreadPool.shutdown();
        Assertions.assertTrue(newFixedThreadPool.awaitTermination(6L, TimeUnit.SECONDS), "should complete in 6 seconds");
        if (atomicReference.get() == null) {
            Assertions.assertNotNull(atomicReference2.get(), "at least one of the results should be valid");
            return (T) atomicReference2.get();
        }
        Assertions.assertNull(atomicReference2.get(), "only one of the results should be valid");
        return (T) atomicReference.get();
    }
}
