package org.infinispan.remoting.transport.jgroups;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.LongConsumer;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.test.Exceptions;
import org.infinispan.remoting.responses.ValidResponse;
import org.infinispan.remoting.transport.BackupResponse;
import org.infinispan.remoting.transport.XSiteAsyncAckListener;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.util.ControlledTimeService;
import org.infinispan.util.concurrent.TimeoutException;
import org.infinispan.xsite.XSiteBackup;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = {"unit"}, testName = "remoting.transport.jgroups.JGroupsBackupResponseUnitTest")
/* loaded from: input_file:org/infinispan/remoting/transport/jgroups/JGroupsBackupResponseUnitTest.class */
public class JGroupsBackupResponseUnitTest extends AbstractInfinispanTest {
    private final ControlledTimeService timeService = new ControlledTimeService();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/remoting/transport/jgroups/JGroupsBackupResponseUnitTest$Listener.class */
    public static class Listener implements XSiteAsyncAckListener, LongConsumer {
        private final BlockingDeque<ListenerData> queue = new LinkedBlockingDeque();

        private Listener() {
        }

        public void onAckReceived(long j, String str, Throwable th) {
            this.queue.add(new ListenerData(j, str, th));
        }

        @Override // java.util.function.LongConsumer
        public void accept(long j) {
            this.queue.add(new ListenerData(j, null, null));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/remoting/transport/jgroups/JGroupsBackupResponseUnitTest$ListenerData.class */
    public static class ListenerData {
        private final long time;
        private final String siteName;
        private final Throwable throwable;

        private ListenerData(long j, String str, Throwable th) {
            this.time = j;
            this.siteName = str;
            this.throwable = th;
        }
    }

    private static Map<XSiteBackup, CompletableFuture<ValidResponse>> createResponseMap(Collection<XSiteBackup> collection) {
        HashMap hashMap = new HashMap(collection.size());
        Iterator<XSiteBackup> it = collection.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), new CompletableFuture());
        }
        return hashMap;
    }

    private static XSiteBackup createSyncBackup(String str, long j) {
        return new XSiteBackup(str, true, j);
    }

    private static XSiteBackup createAsyncBackup(String str) {
        return new XSiteBackup(str, false, 15000L);
    }

    public void testNoWaitForAsyncWithMix() {
        ArrayList arrayList = new ArrayList(2);
        arrayList.add(createSyncBackup("sync", 10000L));
        arrayList.add(createAsyncBackup("async"));
        Map<XSiteBackup, CompletableFuture<ValidResponse>> createResponseMap = createResponseMap(arrayList);
        Future<Void> waitBackupResponse = waitBackupResponse(newBackupResponse(createResponseMap));
        assertNotCompleted(waitBackupResponse);
        createResponseMap.get(arrayList.get(0)).complete(null);
        assertCompleted(waitBackupResponse);
    }

    public void testNoWaitForAsyncWith() {
        assertCompleted(waitBackupResponse(newBackupResponse(createResponseMap(Collections.singletonList(createAsyncBackup("async-only"))))));
    }

    public void testAsyncListener() {
        Listener listener = new Listener();
        long time = this.timeService.time();
        ArrayList arrayList = new ArrayList(2);
        arrayList.add(createAsyncBackup("async-1"));
        arrayList.add(createAsyncBackup("async-2"));
        Map<XSiteBackup, CompletableFuture<ValidResponse>> createResponseMap = createResponseMap(arrayList);
        BackupResponse newBackupResponse = newBackupResponse(createResponseMap);
        newBackupResponse.notifyAsyncAck(listener);
        AssertJUnit.assertTrue(listener.queue.isEmpty());
        this.timeService.advance(10L);
        createResponseMap.get(arrayList.get(0)).complete(null);
        assertListenerData(listener, time, "async-1", null);
        this.timeService.advance(10L);
        CacheException cacheException = new CacheException("Test-Exception");
        createResponseMap.get(arrayList.get(1)).completeExceptionally(cacheException);
        assertListenerData(listener, time, "async-2", cacheException);
        AssertJUnit.assertTrue(listener.queue.isEmpty());
        AssertJUnit.assertEquals(TimeUnit.NANOSECONDS.toMillis(time), newBackupResponse.getSendTimeMillis());
    }

    public void testSyncListener() {
        Listener listener = new Listener();
        ArrayList arrayList = new ArrayList(2);
        arrayList.add(createSyncBackup("sync-1", 10000L));
        arrayList.add(createAsyncBackup("async-2"));
        Map<XSiteBackup, CompletableFuture<ValidResponse>> createResponseMap = createResponseMap(arrayList);
        BackupResponse newBackupResponse = newBackupResponse(createResponseMap);
        newBackupResponse.notifyFinish(listener);
        AssertJUnit.assertTrue(listener.queue.isEmpty());
        Future<Void> waitBackupResponse = waitBackupResponse(newBackupResponse);
        this.timeService.advance(10L);
        createResponseMap.get(arrayList.get(1)).complete(null);
        assertNotCompleted(waitBackupResponse);
        AssertJUnit.assertTrue(listener.queue.isEmpty());
        this.timeService.advance(10L);
        createResponseMap.get(arrayList.get(0)).complete(null);
        assertCompleted(waitBackupResponse);
        assertListenerData(listener, 20L, null, null);
        AssertJUnit.assertTrue(listener.queue.isEmpty());
    }

    public void testNoErrorsFromAsync() {
        ArrayList arrayList = new ArrayList(3);
        arrayList.add(createSyncBackup("sync-1", 10000L));
        arrayList.add(createSyncBackup("sync-2", 2 * 10000));
        arrayList.add(createAsyncBackup("async"));
        Map<XSiteBackup, CompletableFuture<ValidResponse>> createResponseMap = createResponseMap(arrayList);
        BackupResponse newBackupResponse = newBackupResponse(createResponseMap);
        this.timeService.advance(10000 + 1);
        Future<Void> waitBackupResponse = waitBackupResponse(newBackupResponse);
        assertNotCompleted(waitBackupResponse);
        CacheException cacheException = new CacheException("Test-Exception");
        createResponseMap.get(arrayList.get(1)).complete(null);
        createResponseMap.get(arrayList.get(2)).completeExceptionally(cacheException);
        assertCompleted(waitBackupResponse);
        AssertJUnit.assertEquals(1, newBackupResponse.getCommunicationErrors().size());
        AssertJUnit.assertEquals(1, newBackupResponse.getFailedBackups().size());
        AssertJUnit.assertTrue(newBackupResponse.getCommunicationErrors().contains("sync-1"));
        AssertJUnit.assertTrue(newBackupResponse.getFailedBackups().containsKey("sync-1"));
        Exceptions.assertException(TimeoutException.class, (Throwable) newBackupResponse.getFailedBackups().get("sync-1"));
    }

    public void testEmpty() {
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(createAsyncBackup("async"));
        AssertJUnit.assertTrue(newBackupResponse(createResponseMap(arrayList)).isEmpty());
        arrayList.add(createSyncBackup("sync", 10000L));
        AssertJUnit.assertFalse(newBackupResponse(createResponseMap(arrayList)).isEmpty());
    }

    private void assertListenerData(Listener listener, long j, String str, Throwable th) {
        try {
            ListenerData poll = listener.queue.poll(10L, TimeUnit.SECONDS);
            AssertJUnit.assertNotNull("Failed to get event for site " + str, poll);
            AssertJUnit.assertEquals(str, poll.siteName);
            AssertJUnit.assertEquals(j, poll.time);
            AssertJUnit.assertEquals(th, poll.throwable);
        } catch (InterruptedException e) {
            AssertJUnit.fail("Interrupted while waiting for event for site " + str);
        }
    }

    private void assertNotCompleted(Future<Void> future) {
        Exceptions.expectException(java.util.concurrent.TimeoutException.class, () -> {
            future.get(1L, TimeUnit.SECONDS);
        });
    }

    private void assertCompleted(Future<Void> future) {
        try {
            future.get(1L, TimeUnit.SECONDS);
        } catch (InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) {
            AssertJUnit.fail("Backup Response must be completed by now!");
        }
    }

    private Future<Void> waitBackupResponse(BackupResponse backupResponse) {
        Objects.requireNonNull(backupResponse);
        return fork(backupResponse::waitForBackupToFinish);
    }

    private BackupResponse newBackupResponse(Map<XSiteBackup, CompletableFuture<ValidResponse>> map) {
        return new JGroupsBackupResponse(map, this.timeService);
    }
}
