package org.apache.kafka.coordinator.common.runtime;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.common.runtime.CoordinatorExecutor;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
import org.apache.kafka.server.util.FutureUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/coordinator/common/runtime/CoordinatorExecutorImplTest.class */
public class CoordinatorExecutorImplTest {
    private static final LogContext LOG_CONTEXT = new LogContext();
    private static final TopicPartition SHARD_PARTITION = new TopicPartition("__consumer_offsets", 0);
    private static final Duration WRITE_TIMEOUT = Duration.ofMillis(1000);
    private static final String TASK_KEY = "task";

    @Test
    public void testTaskSuccessfulLifecycle() {
        CoordinatorShard coordinatorShard = (CoordinatorShard) Mockito.mock(CoordinatorShard.class);
        CoordinatorRuntime coordinatorRuntime = (CoordinatorRuntime) Mockito.mock(CoordinatorRuntime.class);
        ExecutorService executorService = (ExecutorService) Mockito.mock(ExecutorService.class);
        CoordinatorExecutorImpl coordinatorExecutorImpl = new CoordinatorExecutorImpl(LOG_CONTEXT, SHARD_PARTITION, coordinatorRuntime, executorService, WRITE_TIMEOUT);
        Mockito.when(coordinatorRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq(TASK_KEY), (TopicPartition) ArgumentMatchers.eq(SHARD_PARTITION), (Duration) ArgumentMatchers.eq(WRITE_TIMEOUT), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            Assertions.assertTrue(coordinatorExecutorImpl.isScheduled(TASK_KEY));
            Assertions.assertEquals(new CoordinatorResult(Collections.singletonList("record"), (Object) null), ((CoordinatorRuntime.CoordinatorWriteOperation) invocationOnMock.getArgument(3)).generateRecordsAndResult(coordinatorShard));
            return CompletableFuture.completedFuture(null);
        });
        Mockito.when(executorService.submit((Runnable) ArgumentMatchers.any(Runnable.class))).thenAnswer(invocationOnMock2 -> {
            Assertions.assertTrue(coordinatorExecutorImpl.isScheduled(TASK_KEY));
            ((Runnable) invocationOnMock2.getArgument(0)).run();
            return CompletableFuture.completedFuture(null);
        });
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        CoordinatorExecutor.TaskRunnable taskRunnable = () -> {
            atomicBoolean.set(true);
            return "Hello!";
        };
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        coordinatorExecutorImpl.schedule(TASK_KEY, taskRunnable, (str, th) -> {
            atomicBoolean2.set(true);
            Assertions.assertEquals("Hello!", str);
            Assertions.assertNull(th);
            return new CoordinatorResult(Collections.singletonList("record"), (Object) null);
        });
        Assertions.assertTrue(atomicBoolean.get());
        Assertions.assertTrue(atomicBoolean2.get());
    }

    @Test
    public void testTaskFailedLifecycle() {
        CoordinatorShard coordinatorShard = (CoordinatorShard) Mockito.mock(CoordinatorShard.class);
        CoordinatorRuntime coordinatorRuntime = (CoordinatorRuntime) Mockito.mock(CoordinatorRuntime.class);
        ExecutorService executorService = (ExecutorService) Mockito.mock(ExecutorService.class);
        CoordinatorExecutorImpl coordinatorExecutorImpl = new CoordinatorExecutorImpl(LOG_CONTEXT, SHARD_PARTITION, coordinatorRuntime, executorService, WRITE_TIMEOUT);
        Mockito.when(coordinatorRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq(TASK_KEY), (TopicPartition) ArgumentMatchers.eq(SHARD_PARTITION), (Duration) ArgumentMatchers.eq(WRITE_TIMEOUT), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            Assertions.assertEquals(new CoordinatorResult(Collections.emptyList(), (Object) null), ((CoordinatorRuntime.CoordinatorWriteOperation) invocationOnMock.getArgument(3)).generateRecordsAndResult(coordinatorShard));
            return CompletableFuture.completedFuture(null);
        });
        Mockito.when(executorService.submit((Runnable) ArgumentMatchers.any(Runnable.class))).thenAnswer(invocationOnMock2 -> {
            ((Runnable) invocationOnMock2.getArgument(0)).run();
            return CompletableFuture.completedFuture(null);
        });
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        CoordinatorExecutor.TaskRunnable taskRunnable = () -> {
            atomicBoolean.set(true);
            throw new Exception("Oh no!");
        };
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        coordinatorExecutorImpl.schedule(TASK_KEY, taskRunnable, (str, th) -> {
            atomicBoolean2.set(true);
            Assertions.assertNull(str);
            Assertions.assertNotNull(th);
            Assertions.assertEquals("Oh no!", th.getMessage());
            return new CoordinatorResult(Collections.emptyList(), (Object) null);
        });
        Assertions.assertTrue(atomicBoolean.get());
        Assertions.assertTrue(atomicBoolean2.get());
    }

    @Test
    public void testTaskCancelledBeforeBeingExecuted() {
        CoordinatorRuntime coordinatorRuntime = (CoordinatorRuntime) Mockito.mock(CoordinatorRuntime.class);
        ExecutorService executorService = (ExecutorService) Mockito.mock(ExecutorService.class);
        CoordinatorExecutorImpl coordinatorExecutorImpl = new CoordinatorExecutorImpl(LOG_CONTEXT, SHARD_PARTITION, coordinatorRuntime, executorService, WRITE_TIMEOUT);
        Mockito.when(executorService.submit((Runnable) ArgumentMatchers.any(Runnable.class))).thenAnswer(invocationOnMock -> {
            coordinatorExecutorImpl.cancel(TASK_KEY);
            ((Runnable) invocationOnMock.getArgument(0)).run();
            return CompletableFuture.completedFuture(null);
        });
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        CoordinatorExecutor.TaskRunnable taskRunnable = () -> {
            atomicBoolean.set(true);
            return null;
        };
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        coordinatorExecutorImpl.schedule(TASK_KEY, taskRunnable, (str, th) -> {
            atomicBoolean2.set(true);
            return null;
        });
        Assertions.assertFalse(atomicBoolean.get());
        Assertions.assertFalse(atomicBoolean2.get());
    }

    @Test
    public void testTaskCancelledAfterBeingExecutedButBeforeWriteOperationIsExecuted() {
        CoordinatorShard coordinatorShard = (CoordinatorShard) Mockito.mock(CoordinatorShard.class);
        CoordinatorRuntime coordinatorRuntime = (CoordinatorRuntime) Mockito.mock(CoordinatorRuntime.class);
        ExecutorService executorService = (ExecutorService) Mockito.mock(ExecutorService.class);
        CoordinatorExecutorImpl coordinatorExecutorImpl = new CoordinatorExecutorImpl(LOG_CONTEXT, SHARD_PARTITION, coordinatorRuntime, executorService, WRITE_TIMEOUT);
        Mockito.when(coordinatorRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq(TASK_KEY), (TopicPartition) ArgumentMatchers.eq(SHARD_PARTITION), (Duration) ArgumentMatchers.eq(WRITE_TIMEOUT), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            coordinatorExecutorImpl.cancel(TASK_KEY);
            CoordinatorRuntime.CoordinatorWriteOperation coordinatorWriteOperation = (CoordinatorRuntime.CoordinatorWriteOperation) invocationOnMock.getArgument(3);
            return FutureUtils.failedFuture(Assertions.assertThrows(RejectedExecutionException.class, () -> {
                coordinatorWriteOperation.generateRecordsAndResult(coordinatorShard);
            }));
        });
        Mockito.when(executorService.submit((Runnable) ArgumentMatchers.any(Runnable.class))).thenAnswer(invocationOnMock2 -> {
            ((Runnable) invocationOnMock2.getArgument(0)).run();
            return CompletableFuture.completedFuture(null);
        });
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        CoordinatorExecutor.TaskRunnable taskRunnable = () -> {
            atomicBoolean.set(true);
            return "Hello!";
        };
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        coordinatorExecutorImpl.schedule(TASK_KEY, taskRunnable, (str, th) -> {
            atomicBoolean2.set(true);
            return null;
        });
        Assertions.assertTrue(atomicBoolean.get());
        Assertions.assertFalse(atomicBoolean2.get());
    }

    @Test
    public void testTaskSchedulingWriteOperationFailed() {
        CoordinatorRuntime coordinatorRuntime = (CoordinatorRuntime) Mockito.mock(CoordinatorRuntime.class);
        ExecutorService executorService = (ExecutorService) Mockito.mock(ExecutorService.class);
        CoordinatorExecutorImpl coordinatorExecutorImpl = new CoordinatorExecutorImpl(LOG_CONTEXT, SHARD_PARTITION, coordinatorRuntime, executorService, WRITE_TIMEOUT);
        Mockito.when(coordinatorRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq(TASK_KEY), (TopicPartition) ArgumentMatchers.eq(SHARD_PARTITION), (Duration) ArgumentMatchers.eq(WRITE_TIMEOUT), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(FutureUtils.failedFuture(new Throwable("Oh no!")));
        Mockito.when(executorService.submit((Runnable) ArgumentMatchers.any(Runnable.class))).thenAnswer(invocationOnMock -> {
            ((Runnable) invocationOnMock.getArgument(0)).run();
            return CompletableFuture.completedFuture(null);
        });
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        CoordinatorExecutor.TaskRunnable taskRunnable = () -> {
            atomicBoolean.set(true);
            return "Hello!";
        };
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        coordinatorExecutorImpl.schedule(TASK_KEY, taskRunnable, (str, th) -> {
            atomicBoolean2.set(true);
            return new CoordinatorResult(Collections.emptyList(), (Object) null);
        });
        Assertions.assertTrue(atomicBoolean.get());
        Assertions.assertFalse(atomicBoolean2.get());
        Assertions.assertFalse(coordinatorExecutorImpl.isScheduled(TASK_KEY));
    }
}
