package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.class */
public class HighAvailabilityTaskAssignorTest {
    private final Time time = new MockTime();

    private AssignmentConfigs getConfigWithoutStandbys(String str) {
        return new AssignmentConfigs(100L, 2, 0, IntegrationTestUtils.DEFAULT_TIMEOUT, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, (OptionalInt) null, (OptionalInt) null, str);
    }

    private AssignmentConfigs getConfigWithStandbys(String str) {
        return getConfigWithStandbys(1, str);
    }

    private AssignmentConfigs getConfigWithStandbys(int i, String str) {
        return new AssignmentConfigs(100L, 2, i, IntegrationTestUtils.DEFAULT_TIMEOUT, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, (OptionalInt) null, (OptionalInt) null, str);
    }

    static Stream<Arguments> parameter() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{"none", false, 1}), Arguments.of(new Object[]{"min_traffic", true, 1}), Arguments.of(new Object[]{"balance_subtopology", true, 4})});
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldBeStickyForActiveAndStandbyTasksWhileWarmingUp(String str, boolean z) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1, AssignmentTestUtils.TASK_2_2});
        ClientState clientState = new ClientState(mkSet, Collections.emptySet(), (Map) mkSet.stream().collect(Collectors.toMap(taskId -> {
            return taskId;
        }, taskId2 -> {
            return 0L;
        })), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_1);
        ClientState clientState2 = new ClientState(Collections.emptySet(), mkSet, (Map) mkSet.stream().collect(Collectors.toMap(taskId3 -> {
            return taskId3;
        }, taskId4 -> {
            return 10L;
        })), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_2);
        ClientState clientState3 = new ClientState(Collections.emptySet(), Collections.emptySet(), (Map) mkSet.stream().collect(Collectors.toMap(taskId5 -> {
            return taskId5;
        }, taskId6 -> {
            return Long.MAX_VALUE;
        })), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_3);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.PID_1, clientState), Utils.mkEntry(AssignmentTestUtils.PID_2, clientState2), Utils.mkEntry(AssignmentTestUtils.PID_3, clientState3)});
        AssignmentConfigs assignmentConfigs = new AssignmentConfigs(11L, 2, 1, IntegrationTestUtils.DEFAULT_TIMEOUT, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, (OptionalInt) null, (OptionalInt) null, str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(assignmentConfigs, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2})), Utils.mkEntry(new TopologyMetadata.Subtopology(1, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2})), Utils.mkEntry(new TopologyMetadata.Subtopology(2, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1, AssignmentTestUtils.TASK_2_2}))}));
        boolean assign = new HighAvailabilityTaskAssignor().assign(mkMap, mkSet, mkSet, rackAwareTaskAssignor, assignmentConfigs);
        MatcherAssert.assertThat(clientState, AssignmentTestUtils.hasAssignedTasks(mkSet.size()));
        MatcherAssert.assertThat(clientState2, AssignmentTestUtils.hasAssignedTasks(mkSet.size()));
        MatcherAssert.assertThat(clientState3, AssignmentTestUtils.hasAssignedTasks(2));
        MatcherAssert.assertThat(Boolean.valueOf(assign), Matchers.is(true));
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, mkMap, true, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldSkipWarmupsWhenAcceptableLagIsMax(String str, boolean z) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1, AssignmentTestUtils.TASK_2_2});
        ClientState clientState = new ClientState(mkSet, Collections.emptySet(), (Map) mkSet.stream().collect(Collectors.toMap(taskId -> {
            return taskId;
        }, taskId2 -> {
            return 0L;
        })), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_1);
        ClientState clientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), (Map) mkSet.stream().collect(Collectors.toMap(taskId3 -> {
            return taskId3;
        }, taskId4 -> {
            return Long.MAX_VALUE;
        })), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_2);
        ClientState clientState3 = new ClientState(Collections.emptySet(), Collections.emptySet(), (Map) mkSet.stream().collect(Collectors.toMap(taskId5 -> {
            return taskId5;
        }, taskId6 -> {
            return Long.MAX_VALUE;
        })), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_3);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.PID_1, clientState), Utils.mkEntry(AssignmentTestUtils.PID_2, clientState2), Utils.mkEntry(AssignmentTestUtils.PID_3, clientState3)});
        AssignmentConfigs assignmentConfigs = new AssignmentConfigs(Long.MAX_VALUE, 1, 1, IntegrationTestUtils.DEFAULT_TIMEOUT, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, (OptionalInt) null, (OptionalInt) null, str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(assignmentConfigs, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2})), Utils.mkEntry(new TopologyMetadata.Subtopology(1, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2})), Utils.mkEntry(new TopologyMetadata.Subtopology(2, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1, AssignmentTestUtils.TASK_2_2}))}));
        boolean assign = new HighAvailabilityTaskAssignor().assign(mkMap, mkSet, mkSet, rackAwareTaskAssignor, assignmentConfigs);
        MatcherAssert.assertThat(clientState, AssignmentTestUtils.hasAssignedTasks(6));
        MatcherAssert.assertThat(clientState2, AssignmentTestUtils.hasAssignedTasks(6));
        MatcherAssert.assertThat(clientState3, AssignmentTestUtils.hasAssignedTasks(6));
        MatcherAssert.assertThat(Boolean.valueOf(assign), Matchers.is(false));
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, mkMap, true, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldAssignActiveStatefulTasksEvenlyOverClientsWhereNumberOfClientsIntegralDivisorOfNumberOfTasks(String str, boolean z, int i) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1, AssignmentTestUtils.TASK_2_2});
        Map map = (Map) mkSet.stream().collect(Collectors.toMap(taskId -> {
            return taskId;
        }, taskId2 -> {
            return 10L;
        }));
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(new ClientState(Collections.emptySet(), Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_1), new ClientState(Collections.emptySet(), Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_2), new ClientState(Collections.emptySet(), Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_3));
        AssignmentConfigs assignmentConfigs = new AssignmentConfigs(0L, 1, 0, IntegrationTestUtils.DEFAULT_TIMEOUT, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, (OptionalInt) null, (OptionalInt) null, str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(assignmentConfigs, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2})), Utils.mkEntry(new TopologyMetadata.Subtopology(1, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2})), Utils.mkEntry(new TopologyMetadata.Subtopology(2, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1, AssignmentTestUtils.TASK_2_2}))}));
        MatcherAssert.assertThat(Boolean.valueOf(new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, mkSet, rackAwareTaskAssignor, assignmentConfigs)), Matchers.is(false));
        AssignmentTestUtils.assertValidAssignment(0, mkSet, Collections.emptySet(), clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(mkSet, clientStatesMap, new StringBuilder());
        if (!str.equals("min_traffic")) {
            AssignmentTestUtils.assertBalancedTasks(clientStatesMap, i);
        }
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, false, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldAssignActiveStatefulTasksEvenlyOverClientsWhereNumberOfThreadsIntegralDivisorOfNumberOfTasks(String str, boolean z, int i) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1, AssignmentTestUtils.TASK_2_2});
        Map map = (Map) mkSet.stream().collect(Collectors.toMap(taskId -> {
            return taskId;
        }, taskId2 -> {
            return 10L;
        }));
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(new ClientState(Collections.emptySet(), Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 3, AssignmentTestUtils.PID_1), new ClientState(Collections.emptySet(), Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 3, AssignmentTestUtils.PID_2), new ClientState(Collections.emptySet(), Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 3, AssignmentTestUtils.PID_3));
        AssignmentConfigs assignmentConfigs = new AssignmentConfigs(0L, 1, 0, IntegrationTestUtils.DEFAULT_TIMEOUT, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, (OptionalInt) null, (OptionalInt) null, str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(assignmentConfigs, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2})), Utils.mkEntry(new TopologyMetadata.Subtopology(1, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2})), Utils.mkEntry(new TopologyMetadata.Subtopology(2, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1, AssignmentTestUtils.TASK_2_2}))}));
        MatcherAssert.assertThat(Boolean.valueOf(new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, mkSet, rackAwareTaskAssignor, assignmentConfigs)), Matchers.is(false));
        AssignmentTestUtils.assertValidAssignment(0, mkSet, Collections.emptySet(), clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(mkSet, clientStatesMap, new StringBuilder());
        if (!str.equals("min_traffic")) {
            AssignmentTestUtils.assertBalancedTasks(clientStatesMap, i);
        }
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, false, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldAssignActiveStatefulTasksEvenlyOverClientsWhereNumberOfClientsNotIntegralDivisorOfNumberOfTasks(String str, boolean z, int i) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1, AssignmentTestUtils.TASK_2_2});
        Map map = (Map) mkSet.stream().collect(Collectors.toMap(taskId -> {
            return taskId;
        }, taskId2 -> {
            return 10L;
        }));
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(new ClientState(Collections.emptySet(), Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_1), new ClientState(Collections.emptySet(), Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_2));
        AssignmentConfigs assignmentConfigs = new AssignmentConfigs(0L, 1, 0, IntegrationTestUtils.DEFAULT_TIMEOUT, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, (OptionalInt) null, (OptionalInt) null, str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(assignmentConfigs, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2})), Utils.mkEntry(new TopologyMetadata.Subtopology(1, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2})), Utils.mkEntry(new TopologyMetadata.Subtopology(2, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1, AssignmentTestUtils.TASK_2_2}))}));
        MatcherAssert.assertThat(Boolean.valueOf(new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, mkSet, rackAwareTaskAssignor, assignmentConfigs)), Matchers.is(false));
        AssignmentTestUtils.assertValidAssignment(0, mkSet, Collections.emptySet(), clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(mkSet, clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedTasks(clientStatesMap, i);
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, false, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldAssignActiveStatefulTasksEvenlyOverUnevenlyDistributedStreamThreads(String str, boolean z) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2});
        Map map = (Map) mkSet.stream().collect(Collectors.toMap(taskId -> {
            return taskId;
        }, taskId2 -> {
            return 10L;
        }));
        ClientState clientState = new ClientState(Collections.emptySet(), Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_1);
        ClientState clientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 2, AssignmentTestUtils.PID_2);
        ClientState clientState3 = new ClientState(Collections.emptySet(), Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 3, AssignmentTestUtils.PID_3);
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(clientState, clientState2, clientState3);
        AssignmentConfigs assignmentConfigs = new AssignmentConfigs(0L, 1, 0, IntegrationTestUtils.DEFAULT_TIMEOUT, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, (OptionalInt) null, (OptionalInt) null, str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(assignmentConfigs, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2})), Utils.mkEntry(new TopologyMetadata.Subtopology(1, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2}))}));
        MatcherAssert.assertThat(Boolean.valueOf(new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, mkSet, rackAwareTaskAssignor, assignmentConfigs)), Matchers.is(false));
        AssignmentTestUtils.assertValidAssignment(0, mkSet, Collections.emptySet(), clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(mkSet, clientStatesMap, new StringBuilder());
        MatcherAssert.assertThat(clientState, AssignmentTestUtils.hasActiveTasks(1));
        MatcherAssert.assertThat(clientState2, AssignmentTestUtils.hasActiveTasks(2));
        MatcherAssert.assertThat(clientState3, AssignmentTestUtils.hasActiveTasks(3));
        AssignmentTestUtils.TaskSkewReport analyzeTaskAssignmentBalance = AssignmentTestUtils.analyzeTaskAssignmentBalance(clientStatesMap, 1);
        if (analyzeTaskAssignmentBalance.totalSkewedTasks() == 0) {
            Assertions.fail("Expected a skewed task assignment, but was: " + analyzeTaskAssignmentBalance);
        }
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, false, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldAssignActiveStatefulTasksEvenlyOverClientsWithMoreClientsThanTasks(String str, boolean z, int i) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        Map map = (Map) mkSet.stream().collect(Collectors.toMap(taskId -> {
            return taskId;
        }, taskId2 -> {
            return 10L;
        }));
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(new ClientState(Collections.emptySet(), Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_1), new ClientState(Collections.emptySet(), Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_2), new ClientState(Collections.emptySet(), Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_3));
        AssignmentConfigs assignmentConfigs = new AssignmentConfigs(0L, 1, 0, IntegrationTestUtils.DEFAULT_TIMEOUT, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, (OptionalInt) null, (OptionalInt) null, str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(assignmentConfigs, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1}))}));
        MatcherAssert.assertThat(Boolean.valueOf(new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, mkSet, rackAwareTaskAssignor, assignmentConfigs)), Matchers.is(false));
        AssignmentTestUtils.assertValidAssignment(0, mkSet, Collections.emptySet(), clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(mkSet, clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedTasks(clientStatesMap, i);
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, false, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldAssignActiveStatefulTasksEvenlyOverClientsAndStreamThreadsWithEqualStreamThreadsPerClientAsTasks(String str, boolean z, int i) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1, AssignmentTestUtils.TASK_2_2});
        Map map = (Map) mkSet.stream().collect(Collectors.toMap(taskId -> {
            return taskId;
        }, taskId2 -> {
            return 10L;
        }));
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(new ClientState(Collections.emptySet(), Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 9, AssignmentTestUtils.PID_1), new ClientState(Collections.emptySet(), Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 9, AssignmentTestUtils.PID_2), new ClientState(Collections.emptySet(), Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 9, AssignmentTestUtils.PID_3));
        AssignmentConfigs assignmentConfigs = new AssignmentConfigs(0L, 1, 0, IntegrationTestUtils.DEFAULT_TIMEOUT, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, (OptionalInt) null, (OptionalInt) null, str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(assignmentConfigs, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2})), Utils.mkEntry(new TopologyMetadata.Subtopology(1, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2})), Utils.mkEntry(new TopologyMetadata.Subtopology(2, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1, AssignmentTestUtils.TASK_2_2}))}));
        MatcherAssert.assertThat(Boolean.valueOf(new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, mkSet, rackAwareTaskAssignor, assignmentConfigs)), Matchers.is(false));
        AssignmentTestUtils.assertValidAssignment(0, mkSet, Collections.emptySet(), clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(mkSet, clientStatesMap, new StringBuilder());
        if (!str.equals("min_traffic")) {
            AssignmentTestUtils.assertBalancedTasks(clientStatesMap, i);
        }
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, false, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldAssignWarmUpTasksIfStatefulActiveTasksBalancedOverStreamThreadsButNotOverClients(String str, boolean z) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1});
        Map map = (Map) mkSet.stream().collect(Collectors.toMap(taskId -> {
            return taskId;
        }, taskId2 -> {
            return 0L;
        }));
        Map map2 = (Map) mkSet.stream().collect(Collectors.toMap(taskId3 -> {
            return taskId3;
        }, taskId4 -> {
            return Long.MAX_VALUE;
        }));
        ClientState clientState = new ClientState(mkSet, Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 5, AssignmentTestUtils.PID_1);
        ClientState clientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), map2, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 5, AssignmentTestUtils.PID_2);
        ClientState clientState3 = new ClientState(Collections.emptySet(), Collections.emptySet(), map2, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 5, AssignmentTestUtils.PID_3);
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(clientState, clientState2, clientState3);
        AssignmentConfigs assignmentConfigs = new AssignmentConfigs(0L, (mkSet.size() / 3) + 1, 0, IntegrationTestUtils.DEFAULT_TIMEOUT, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, (OptionalInt) null, (OptionalInt) null, str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(assignmentConfigs, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1})), Utils.mkEntry(new TopologyMetadata.Subtopology(1, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1}))}));
        MatcherAssert.assertThat(Boolean.valueOf(new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, mkSet, rackAwareTaskAssignor, assignmentConfigs)), Matchers.is(true));
        MatcherAssert.assertThat(Integer.valueOf(clientState2.standbyTaskCount()), Matchers.greaterThanOrEqualTo(Integer.valueOf(mkSet.size() / 3)));
        MatcherAssert.assertThat(Integer.valueOf(clientState3.standbyTaskCount()), Matchers.greaterThanOrEqualTo(Integer.valueOf(mkSet.size() / 3)));
        AssignmentTestUtils.assertValidAssignment(0, (mkSet.size() / 3) + 1, mkSet, Collections.emptySet(), clientStatesMap, new StringBuilder());
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, false, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldEvenlyAssignActiveStatefulTasksIfClientsAreWarmedUpToBalanceTaskOverClients(String str, boolean z, int i) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1});
        TaskId taskId = z ? AssignmentTestUtils.TASK_1_1 : AssignmentTestUtils.TASK_0_1;
        TaskId taskId2 = z ? AssignmentTestUtils.TASK_0_1 : AssignmentTestUtils.TASK_1_0;
        Set mkSet2 = Utils.mkSet(new TaskId[]{taskId});
        Set mkSet3 = Utils.mkSet(new TaskId[]{taskId2});
        Map map = (Map) mkSet.stream().collect(Collectors.toMap(taskId3 -> {
            return taskId3;
        }, taskId4 -> {
            return 0L;
        }));
        Map map2 = (Map) mkSet.stream().collect(Collectors.toMap(taskId5 -> {
            return taskId5;
        }, taskId6 -> {
            return Long.MAX_VALUE;
        }));
        map2.put(taskId, 0L);
        Map map3 = (Map) mkSet.stream().collect(Collectors.toMap(taskId7 -> {
            return taskId7;
        }, taskId8 -> {
            return Long.MAX_VALUE;
        }));
        map3.put(taskId2, 0L);
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(new ClientState(mkSet, Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 5, AssignmentTestUtils.PID_1), new ClientState(Collections.emptySet(), mkSet2, map2, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 5, AssignmentTestUtils.PID_2), new ClientState(Collections.emptySet(), mkSet3, map3, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 5, AssignmentTestUtils.PID_3));
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(new AssignmentConfigs(0L, (mkSet.size() / 3) + 1, 0, IntegrationTestUtils.DEFAULT_TIMEOUT, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, (OptionalInt) null, (OptionalInt) null, str), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1})), Utils.mkEntry(new TopologyMetadata.Subtopology(1, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1}))}));
        MatcherAssert.assertThat(Boolean.valueOf(new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, mkSet, rackAwareTaskAssignor, new AssignmentConfigs(0L, Integer.valueOf((mkSet.size() / 3) + 1), 0, Long.valueOf(IntegrationTestUtils.DEFAULT_TIMEOUT), AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS))), Matchers.is(false));
        AssignmentTestUtils.assertValidAssignment(0, mkSet, Collections.emptySet(), clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(mkSet, clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedTasks(clientStatesMap, i);
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, false, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldAssignActiveStatefulTasksEvenlyOverStreamThreadsButBestEffortOverClients(String str, boolean z) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1, AssignmentTestUtils.TASK_2_2});
        Map map = (Map) mkSet.stream().collect(Collectors.toMap(taskId -> {
            return taskId;
        }, taskId2 -> {
            return 10L;
        }));
        ClientState clientState = new ClientState(Collections.emptySet(), Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 6, AssignmentTestUtils.PID_1);
        ClientState clientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 3, AssignmentTestUtils.PID_2);
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(clientState, clientState2);
        AssignmentConfigs assignmentConfigs = new AssignmentConfigs(0L, 1, 0, IntegrationTestUtils.DEFAULT_TIMEOUT, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, (OptionalInt) null, (OptionalInt) null, str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(assignmentConfigs, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2})), Utils.mkEntry(new TopologyMetadata.Subtopology(1, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2})), Utils.mkEntry(new TopologyMetadata.Subtopology(2, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1, AssignmentTestUtils.TASK_2_2}))}));
        MatcherAssert.assertThat(Boolean.valueOf(new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, mkSet, rackAwareTaskAssignor, assignmentConfigs)), Matchers.is(false));
        AssignmentTestUtils.assertValidAssignment(0, mkSet, Collections.emptySet(), clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(mkSet, clientStatesMap, new StringBuilder());
        MatcherAssert.assertThat(clientState, AssignmentTestUtils.hasActiveTasks(6));
        MatcherAssert.assertThat(clientState2, AssignmentTestUtils.hasActiveTasks(3));
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, false, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldComputeNewAssignmentIfThereAreUnassignedActiveTasks(String str, boolean z, int i) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        ClientState clientState = new ClientState(Collections.singleton(AssignmentTestUtils.TASK_0_0), Collections.emptySet(), Collections.singletonMap(AssignmentTestUtils.TASK_0_0, 0L), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_1);
        Map singletonMap = Collections.singletonMap(AssignmentTestUtils.PID_1, clientState);
        AssignmentConfigs configWithoutStandbys = getConfigWithoutStandbys(str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(configWithoutStandbys, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1}))}));
        MatcherAssert.assertThat(Boolean.valueOf(new HighAvailabilityTaskAssignor().assign(singletonMap, mkSet, Collections.singleton(AssignmentTestUtils.TASK_0_0), rackAwareTaskAssignor, configWithoutStandbys)), Matchers.is(false));
        MatcherAssert.assertThat(clientState, AssignmentTestUtils.hasActiveTasks(2));
        MatcherAssert.assertThat(clientState, AssignmentTestUtils.hasStandbyTasks(0));
        AssignmentTestUtils.assertValidAssignment(0, mkSet, Collections.emptySet(), singletonMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(singletonMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(mkSet, singletonMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedTasks(singletonMap, i);
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, singletonMap, false, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldComputeNewAssignmentIfThereAreUnassignedStandbyTasks(String str, boolean z, int i) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0});
        Set mkSet2 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0});
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.PID_1, new ClientState(Collections.singleton(AssignmentTestUtils.TASK_0_0), Collections.emptySet(), Collections.singletonMap(AssignmentTestUtils.TASK_0_0, 0L), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_1)), Utils.mkEntry(AssignmentTestUtils.PID_2, new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.singletonMap(AssignmentTestUtils.TASK_0_0, 0L), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_2))});
        AssignmentConfigs configWithStandbys = getConfigWithStandbys(str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(configWithStandbys, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0}))}));
        boolean assign = new HighAvailabilityTaskAssignor().assign(mkMap, mkSet, mkSet2, rackAwareTaskAssignor, configWithStandbys);
        MatcherAssert.assertThat(((ClientState) mkMap.get(AssignmentTestUtils.PID_2)).standbyTasks(), Matchers.not(Matchers.empty()));
        MatcherAssert.assertThat(Boolean.valueOf(assign), Matchers.is(false));
        AssignmentTestUtils.assertValidAssignment(1, mkSet, Collections.emptySet(), mkMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(mkMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(mkSet, mkMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedTasks(mkMap, i);
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, mkMap, true, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldComputeNewAssignmentIfActiveTasksWasNotOnCaughtUpClient(String str, boolean z, int i) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        Set mkSet2 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0});
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.PID_1, new ClientState(Collections.singleton(AssignmentTestUtils.TASK_0_0), Collections.emptySet(), Collections.singletonMap(AssignmentTestUtils.TASK_0_0, 500L), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_1)), Utils.mkEntry(AssignmentTestUtils.PID_2, new ClientState(Collections.singleton(AssignmentTestUtils.TASK_0_1), Collections.emptySet(), Collections.singletonMap(AssignmentTestUtils.TASK_0_0, 0L), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_2))});
        AssignmentConfigs configWithoutStandbys = getConfigWithoutStandbys(str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(configWithoutStandbys, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1}))}));
        boolean assign = new HighAvailabilityTaskAssignor().assign(mkMap, mkSet, mkSet2, rackAwareTaskAssignor, configWithoutStandbys);
        MatcherAssert.assertThat(((ClientState) mkMap.get(AssignmentTestUtils.PID_1)).activeTasks(), Matchers.is(Collections.singleton(AssignmentTestUtils.TASK_0_1)));
        MatcherAssert.assertThat(((ClientState) mkMap.get(AssignmentTestUtils.PID_2)).activeTasks(), Matchers.is(Collections.singleton(AssignmentTestUtils.TASK_0_0)));
        MatcherAssert.assertThat(Boolean.valueOf(assign), Matchers.is(true));
        AssignmentTestUtils.assertValidAssignment(0, 1, mkSet, Collections.emptySet(), mkMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(mkMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(mkSet, mkMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedTasks(mkMap, i);
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, mkMap, false, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldAssignToMostCaughtUpIfActiveTasksWasNotOnCaughtUpClient(String str, boolean z, int i) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0});
        Set mkSet2 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0});
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(AssignmentTestUtils.PID_1, new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.singletonMap(AssignmentTestUtils.TASK_0_0, Long.MAX_VALUE), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_1)), Utils.mkEntry(AssignmentTestUtils.PID_2, new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.singletonMap(AssignmentTestUtils.TASK_0_0, 1000L), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_2)), Utils.mkEntry(AssignmentTestUtils.PID_3, new ClientState(Collections.emptySet(), Collections.emptySet(), Collections.singletonMap(AssignmentTestUtils.TASK_0_0, 500L), AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, AssignmentTestUtils.PID_3))});
        AssignmentConfigs configWithStandbys = getConfigWithStandbys(str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(configWithStandbys, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0}))}));
        boolean assign = new HighAvailabilityTaskAssignor().assign(mkMap, mkSet, mkSet2, rackAwareTaskAssignor, configWithStandbys);
        MatcherAssert.assertThat(((ClientState) mkMap.get(AssignmentTestUtils.PID_1)).activeTasks(), Matchers.is(Collections.emptySet()));
        MatcherAssert.assertThat(((ClientState) mkMap.get(AssignmentTestUtils.PID_2)).activeTasks(), Matchers.is(Collections.emptySet()));
        MatcherAssert.assertThat(((ClientState) mkMap.get(AssignmentTestUtils.PID_3)).activeTasks(), Matchers.is(Collections.singleton(AssignmentTestUtils.TASK_0_0)));
        MatcherAssert.assertThat(((ClientState) mkMap.get(AssignmentTestUtils.PID_1)).standbyTasks(), Matchers.is(Collections.singleton(AssignmentTestUtils.TASK_0_0)));
        MatcherAssert.assertThat(((ClientState) mkMap.get(AssignmentTestUtils.PID_2)).standbyTasks(), Matchers.is(Collections.singleton(AssignmentTestUtils.TASK_0_0)));
        MatcherAssert.assertThat(((ClientState) mkMap.get(AssignmentTestUtils.PID_3)).standbyTasks(), Matchers.is(Collections.emptySet()));
        MatcherAssert.assertThat(Boolean.valueOf(assign), Matchers.is(true));
        AssignmentTestUtils.assertValidAssignment(1, 1, mkSet, Collections.emptySet(), mkMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(mkMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(mkSet, mkMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedTasks(mkMap, i);
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, mkMap, true, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldAssignStandbysForStatefulTasks(String str, boolean z) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        Set mkSet2 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        ClientState mockClientWithPreviousCaughtUpTasks = getMockClientWithPreviousCaughtUpTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0}), mkSet2, AssignmentTestUtils.PID_1);
        ClientState mockClientWithPreviousCaughtUpTasks2 = getMockClientWithPreviousCaughtUpTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_1}), mkSet2, AssignmentTestUtils.PID_2);
        AssignmentConfigs configWithStandbys = getConfigWithStandbys(str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(configWithStandbys, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1}))}));
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(mockClientWithPreviousCaughtUpTasks, mockClientWithPreviousCaughtUpTasks2);
        boolean assign = new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, mkSet2, rackAwareTaskAssignor, configWithStandbys);
        MatcherAssert.assertThat(mockClientWithPreviousCaughtUpTasks.activeTasks(), CoreMatchers.equalTo(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0})));
        MatcherAssert.assertThat(mockClientWithPreviousCaughtUpTasks2.activeTasks(), CoreMatchers.equalTo(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_1})));
        MatcherAssert.assertThat(mockClientWithPreviousCaughtUpTasks.standbyTasks(), CoreMatchers.equalTo(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_1})));
        MatcherAssert.assertThat(mockClientWithPreviousCaughtUpTasks2.standbyTasks(), CoreMatchers.equalTo(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0})));
        MatcherAssert.assertThat(Boolean.valueOf(assign), Matchers.is(false));
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, true, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldNotAssignStandbysForStatelessTasks(String str, boolean z) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        Set<TaskId> set = AssignmentTestUtils.EMPTY_TASKS;
        ClientState mockClientWithPreviousCaughtUpTasks = getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, set, AssignmentTestUtils.PID_1);
        ClientState mockClientWithPreviousCaughtUpTasks2 = getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, set, AssignmentTestUtils.PID_2);
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(mockClientWithPreviousCaughtUpTasks, mockClientWithPreviousCaughtUpTasks2);
        AssignmentConfigs configWithStandbys = getConfigWithStandbys(str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(configWithStandbys, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1}))}));
        boolean assign = new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, set, rackAwareTaskAssignor, configWithStandbys);
        MatcherAssert.assertThat(Integer.valueOf(mockClientWithPreviousCaughtUpTasks.activeTaskCount()), CoreMatchers.equalTo(1));
        MatcherAssert.assertThat(Integer.valueOf(mockClientWithPreviousCaughtUpTasks2.activeTaskCount()), CoreMatchers.equalTo(1));
        assertHasNoStandbyTasks(mockClientWithPreviousCaughtUpTasks, mockClientWithPreviousCaughtUpTasks2);
        MatcherAssert.assertThat(Boolean.valueOf(assign), Matchers.is(false));
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, true, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldAssignWarmupReplicasEvenIfNoStandbyReplicasConfigured(String str, boolean z) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        Set mkSet2 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        ClientState mockClientWithPreviousCaughtUpTasks = getMockClientWithPreviousCaughtUpTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1}), mkSet2, AssignmentTestUtils.PID_1);
        ClientState mockClientWithPreviousCaughtUpTasks2 = getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, mkSet2, AssignmentTestUtils.PID_2);
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(mockClientWithPreviousCaughtUpTasks, mockClientWithPreviousCaughtUpTasks2);
        AssignmentConfigs configWithoutStandbys = getConfigWithoutStandbys(str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(configWithoutStandbys, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1}))}));
        boolean assign = new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, mkSet2, rackAwareTaskAssignor, configWithoutStandbys);
        MatcherAssert.assertThat(mockClientWithPreviousCaughtUpTasks.activeTasks(), CoreMatchers.equalTo(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1})));
        MatcherAssert.assertThat(Integer.valueOf(mockClientWithPreviousCaughtUpTasks2.standbyTaskCount()), CoreMatchers.equalTo(1));
        assertHasNoStandbyTasks(mockClientWithPreviousCaughtUpTasks);
        assertHasNoActiveTasks(mockClientWithPreviousCaughtUpTasks2);
        MatcherAssert.assertThat(Boolean.valueOf(assign), Matchers.is(true));
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, false, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldNotAssignMoreThanMaxWarmupReplicas(String str, boolean z) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3});
        Set mkSet2 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3});
        ClientState mockClientWithPreviousCaughtUpTasks = getMockClientWithPreviousCaughtUpTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3}), mkSet2, AssignmentTestUtils.PID_1);
        ClientState mockClientWithPreviousCaughtUpTasks2 = getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, mkSet2, AssignmentTestUtils.PID_2);
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(mockClientWithPreviousCaughtUpTasks, mockClientWithPreviousCaughtUpTasks2);
        AssignmentConfigs assignmentConfigs = new AssignmentConfigs(100L, 1, 0, IntegrationTestUtils.DEFAULT_TIMEOUT, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, (OptionalInt) null, (OptionalInt) null, str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(assignmentConfigs, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3}))}));
        boolean assign = new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, mkSet2, rackAwareTaskAssignor, assignmentConfigs);
        MatcherAssert.assertThat(mockClientWithPreviousCaughtUpTasks.activeTasks(), CoreMatchers.equalTo(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3})));
        MatcherAssert.assertThat(Integer.valueOf(mockClientWithPreviousCaughtUpTasks2.standbyTaskCount()), CoreMatchers.equalTo(1));
        assertHasNoStandbyTasks(mockClientWithPreviousCaughtUpTasks);
        assertHasNoActiveTasks(mockClientWithPreviousCaughtUpTasks2);
        MatcherAssert.assertThat(Boolean.valueOf(assign), Matchers.is(true));
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, false, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldNotAssignWarmupAndStandbyToTheSameClient(String str, boolean z) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3});
        Set mkSet2 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3});
        ClientState mockClientWithPreviousCaughtUpTasks = getMockClientWithPreviousCaughtUpTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3}), mkSet2, AssignmentTestUtils.PID_1);
        ClientState mockClientWithPreviousCaughtUpTasks2 = getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, mkSet2, AssignmentTestUtils.PID_2);
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(mockClientWithPreviousCaughtUpTasks, mockClientWithPreviousCaughtUpTasks2);
        AssignmentConfigs assignmentConfigs = new AssignmentConfigs(100L, 1, 1, IntegrationTestUtils.DEFAULT_TIMEOUT, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, (OptionalInt) null, (OptionalInt) null, str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(assignmentConfigs, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3}))}));
        boolean assign = new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, mkSet2, rackAwareTaskAssignor, assignmentConfigs);
        MatcherAssert.assertThat(mockClientWithPreviousCaughtUpTasks.activeTasks(), CoreMatchers.equalTo(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3})));
        MatcherAssert.assertThat(mockClientWithPreviousCaughtUpTasks2.standbyTasks(), CoreMatchers.equalTo(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3})));
        assertHasNoStandbyTasks(mockClientWithPreviousCaughtUpTasks);
        assertHasNoActiveTasks(mockClientWithPreviousCaughtUpTasks2);
        MatcherAssert.assertThat(Boolean.valueOf(assign), Matchers.is(true));
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, true, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldNotAssignAnyStandbysWithInsufficientCapacity(String str, boolean z) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        Set mkSet2 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        ClientState mockClientWithPreviousCaughtUpTasks = getMockClientWithPreviousCaughtUpTasks(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1}), mkSet2, AssignmentTestUtils.PID_1);
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(mockClientWithPreviousCaughtUpTasks);
        AssignmentConfigs configWithStandbys = getConfigWithStandbys(str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(configWithStandbys, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1}))}));
        boolean assign = new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, mkSet2, rackAwareTaskAssignor, configWithStandbys);
        MatcherAssert.assertThat(mockClientWithPreviousCaughtUpTasks.activeTasks(), CoreMatchers.equalTo(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1})));
        assertHasNoStandbyTasks(mockClientWithPreviousCaughtUpTasks);
        MatcherAssert.assertThat(Boolean.valueOf(assign), Matchers.is(false));
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, true, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldAssignActiveTasksToNotCaughtUpClientIfNoneExist(String str, boolean z) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        Set mkSet2 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        ClientState mockClientWithPreviousCaughtUpTasks = getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, mkSet2, AssignmentTestUtils.PID_1);
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(mockClientWithPreviousCaughtUpTasks);
        AssignmentConfigs configWithStandbys = getConfigWithStandbys(str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(configWithStandbys, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1}))}));
        boolean assign = new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, mkSet2, rackAwareTaskAssignor, configWithStandbys);
        MatcherAssert.assertThat(mockClientWithPreviousCaughtUpTasks.activeTasks(), CoreMatchers.equalTo(Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1})));
        assertHasNoStandbyTasks(mockClientWithPreviousCaughtUpTasks);
        MatcherAssert.assertThat(Boolean.valueOf(assign), Matchers.is(false));
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, true, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldNotAssignMoreThanMaxWarmupReplicasWithStandbys(String str, boolean z) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3});
        Set mkSet2 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3});
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(getMockClientWithPreviousCaughtUpTasks(mkSet2, mkSet2, AssignmentTestUtils.PID_1), getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, mkSet2, AssignmentTestUtils.PID_2), getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, mkSet2, AssignmentTestUtils.PID_3));
        AssignmentConfigs configWithStandbys = getConfigWithStandbys(str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(configWithStandbys, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3}))}));
        boolean assign = new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, mkSet2, rackAwareTaskAssignor, configWithStandbys);
        AssignmentTestUtils.assertValidAssignment(1, 2, mkSet2, Collections.emptySet(), clientStatesMap, new StringBuilder());
        MatcherAssert.assertThat(Boolean.valueOf(assign), Matchers.is(true));
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, true, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldDistributeStatelessTasksToBalanceTotalTaskLoad(String str, boolean z) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2});
        Set mkSet2 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3});
        Set mkSet3 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2});
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(getMockClientWithPreviousCaughtUpTasks(mkSet2, mkSet2, AssignmentTestUtils.PID_1), getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, mkSet2, AssignmentTestUtils.PID_2));
        AssignmentConfigs configWithStandbys = getConfigWithStandbys(str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(configWithStandbys, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3})), Utils.mkEntry(new TopologyMetadata.Subtopology(1, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2}))}));
        boolean assign = new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, mkSet2, rackAwareTaskAssignor, configWithStandbys);
        AssignmentTestUtils.assertValidAssignment(1, 2, mkSet2, mkSet3, clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedStatefulAssignment(mkSet2, clientStatesMap, new StringBuilder());
        AssignmentTestUtils.TaskSkewReport analyzeTaskAssignmentBalance = AssignmentTestUtils.analyzeTaskAssignmentBalance(clientStatesMap, 1);
        MatcherAssert.assertThat(analyzeTaskAssignmentBalance.toString(), analyzeTaskAssignmentBalance.skewedSubtopologies(), Matchers.not(Matchers.empty()));
        MatcherAssert.assertThat(Boolean.valueOf(assign), Matchers.is(true));
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, true, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldDistributeStatefulActiveTasksToAllClients(String str, boolean z) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_1_3, AssignmentTestUtils.TASK_2_0});
        Map map = (Map) mkSet.stream().collect(Collectors.toMap(taskId -> {
            return taskId;
        }, taskId2 -> {
            return 0L;
        }));
        HashSet hashSet = new HashSet(mkSet);
        ClientState clientState = new ClientState(Collections.emptySet(), Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 100);
        ClientState clientState2 = new ClientState(Collections.emptySet(), Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 50);
        ClientState clientState3 = new ClientState(Collections.emptySet(), Collections.emptySet(), map, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1);
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(clientState, clientState2, clientState3);
        AssignmentConfigs configWithoutStandbys = getConfigWithoutStandbys(str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(configWithoutStandbys, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3})), Utils.mkEntry(new TopologyMetadata.Subtopology(1, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_1_3})), Utils.mkEntry(new TopologyMetadata.Subtopology(2, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_2_0}))}));
        boolean assign = new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, hashSet, rackAwareTaskAssignor, configWithoutStandbys);
        MatcherAssert.assertThat(clientState.activeTasks(), Matchers.not(Matchers.empty()));
        MatcherAssert.assertThat(clientState2.activeTasks(), Matchers.not(Matchers.empty()));
        MatcherAssert.assertThat(clientState3.activeTasks(), Matchers.not(Matchers.empty()));
        MatcherAssert.assertThat(Boolean.valueOf(assign), Matchers.is(false));
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, false, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldReturnFalseIfPreviousAssignmentIsReused(String str, boolean z) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3});
        HashSet hashSet = new HashSet(mkSet);
        Set mkSet2 = z ? Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_3}) : Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_2});
        Set mkSet3 = z ? Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2}) : Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_3});
        ClientState mockClientWithPreviousCaughtUpTasks = getMockClientWithPreviousCaughtUpTasks(mkSet2, hashSet, AssignmentTestUtils.PID_1);
        ClientState mockClientWithPreviousCaughtUpTasks2 = getMockClientWithPreviousCaughtUpTasks(mkSet3, hashSet, AssignmentTestUtils.PID_2);
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(mockClientWithPreviousCaughtUpTasks, mockClientWithPreviousCaughtUpTasks2);
        AssignmentConfigs configWithoutStandbys = getConfigWithoutStandbys(str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(configWithoutStandbys, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3}))}));
        MatcherAssert.assertThat(Boolean.valueOf(new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, hashSet, rackAwareTaskAssignor, configWithoutStandbys)), Matchers.is(false));
        MatcherAssert.assertThat(mockClientWithPreviousCaughtUpTasks.activeTasks(), CoreMatchers.equalTo(mockClientWithPreviousCaughtUpTasks.prevActiveTasks()));
        MatcherAssert.assertThat(mockClientWithPreviousCaughtUpTasks2.activeTasks(), CoreMatchers.equalTo(mockClientWithPreviousCaughtUpTasks2.prevActiveTasks()));
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, false, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldReturnFalseIfNoWarmupTasksAreAssigned(String str, boolean z) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3});
        Set<TaskId> set = AssignmentTestUtils.EMPTY_TASKS;
        ClientState mockClientWithPreviousCaughtUpTasks = getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, set, AssignmentTestUtils.PID_1);
        ClientState mockClientWithPreviousCaughtUpTasks2 = getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, set, AssignmentTestUtils.PID_2);
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(mockClientWithPreviousCaughtUpTasks, mockClientWithPreviousCaughtUpTasks2);
        AssignmentConfigs configWithoutStandbys = getConfigWithoutStandbys(str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(configWithoutStandbys, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3}))}));
        MatcherAssert.assertThat(Boolean.valueOf(new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, set, rackAwareTaskAssignor, configWithoutStandbys)), Matchers.is(false));
        assertHasNoStandbyTasks(mockClientWithPreviousCaughtUpTasks, mockClientWithPreviousCaughtUpTasks2);
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, false, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldReturnTrueIfWarmupTasksAreAssigned(String str, boolean z) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        Set mkSet2 = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1});
        ClientState mockClientWithPreviousCaughtUpTasks = getMockClientWithPreviousCaughtUpTasks(mkSet, mkSet2, AssignmentTestUtils.PID_1);
        ClientState mockClientWithPreviousCaughtUpTasks2 = getMockClientWithPreviousCaughtUpTasks(AssignmentTestUtils.EMPTY_TASKS, mkSet2, AssignmentTestUtils.PID_2);
        AssignmentConfigs configWithoutStandbys = getConfigWithoutStandbys(str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(configWithoutStandbys, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1}))}));
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(mockClientWithPreviousCaughtUpTasks, mockClientWithPreviousCaughtUpTasks2);
        MatcherAssert.assertThat(Boolean.valueOf(new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, mkSet2, rackAwareTaskAssignor, configWithoutStandbys)), Matchers.is(true));
        MatcherAssert.assertThat(Integer.valueOf(mockClientWithPreviousCaughtUpTasks2.standbyTaskCount()), CoreMatchers.equalTo(1));
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, false, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldDistributeStatelessTasksEvenlyOverClientsWithEqualStreamThreadsPerClientAsTasksAndNoStatefulTasks(String str, boolean z) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2});
        Set<TaskId> set = AssignmentTestUtils.EMPTY_TASKS;
        HashSet hashSet = new HashSet(mkSet);
        HashMap hashMap = new HashMap();
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(new ClientState(Collections.emptySet(), Collections.emptySet(), hashMap, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 7), new ClientState(Collections.emptySet(), Collections.emptySet(), hashMap, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 7), new ClientState(Collections.emptySet(), Collections.emptySet(), hashMap, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 7));
        AssignmentConfigs assignmentConfigs = new AssignmentConfigs(0L, 1, 0, IntegrationTestUtils.DEFAULT_TIMEOUT, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, (OptionalInt) null, (OptionalInt) null, str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(assignmentConfigs, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3})), Utils.mkEntry(new TopologyMetadata.Subtopology(1, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2}))}));
        boolean assign = new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, set, rackAwareTaskAssignor, assignmentConfigs);
        AssignmentTestUtils.assertValidAssignment(0, AssignmentTestUtils.EMPTY_TASKS, hashSet, clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStatesMap, new StringBuilder());
        MatcherAssert.assertThat(Boolean.valueOf(assign), Matchers.is(false));
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, false, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldDistributeStatelessTasksEvenlyOverClientsWithLessStreamThreadsPerClientAsTasksAndNoStatefulTasks(String str, boolean z) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2});
        Set<TaskId> set = AssignmentTestUtils.EMPTY_TASKS;
        HashSet hashSet = new HashSet(mkSet);
        HashMap hashMap = new HashMap();
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(new ClientState(Collections.emptySet(), Collections.emptySet(), hashMap, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 2), new ClientState(Collections.emptySet(), Collections.emptySet(), hashMap, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 2), new ClientState(Collections.emptySet(), Collections.emptySet(), hashMap, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 2));
        AssignmentConfigs assignmentConfigs = new AssignmentConfigs(0L, 1, 0, IntegrationTestUtils.DEFAULT_TIMEOUT, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, (OptionalInt) null, (OptionalInt) null, str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(assignmentConfigs, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3})), Utils.mkEntry(new TopologyMetadata.Subtopology(1, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2}))}));
        boolean assign = new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, set, rackAwareTaskAssignor, assignmentConfigs);
        AssignmentTestUtils.assertValidAssignment(0, AssignmentTestUtils.EMPTY_TASKS, hashSet, clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStatesMap, new StringBuilder());
        MatcherAssert.assertThat(Boolean.valueOf(assign), Matchers.is(false));
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, false, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldDistributeStatelessTasksEvenlyOverClientsWithUnevenlyDistributedStreamThreadsAndNoStatefulTasks(String str, boolean z) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2});
        Set<TaskId> set = AssignmentTestUtils.EMPTY_TASKS;
        HashSet hashSet = new HashSet(mkSet);
        HashMap hashMap = new HashMap();
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(new ClientState(Collections.emptySet(), Collections.emptySet(), hashMap, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1), new ClientState(Collections.emptySet(), Collections.emptySet(), hashMap, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 2), new ClientState(Collections.emptySet(), Collections.emptySet(), hashMap, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 3));
        AssignmentConfigs assignmentConfigs = new AssignmentConfigs(0L, 1, 0, IntegrationTestUtils.DEFAULT_TIMEOUT, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, (OptionalInt) null, (OptionalInt) null, str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(assignmentConfigs, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3})), Utils.mkEntry(new TopologyMetadata.Subtopology(1, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2}))}));
        boolean assign = new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, set, rackAwareTaskAssignor, assignmentConfigs);
        AssignmentTestUtils.assertValidAssignment(0, AssignmentTestUtils.EMPTY_TASKS, hashSet, clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStatesMap, new StringBuilder());
        MatcherAssert.assertThat(Boolean.valueOf(assign), Matchers.is(false));
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, false, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldDistributeStatelessTasksEvenlyWithPreviousAssignmentAndNoStatefulTasks(String str, boolean z) {
        Set mkSet = Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2});
        Set<TaskId> set = AssignmentTestUtils.EMPTY_TASKS;
        HashSet hashSet = new HashSet(mkSet);
        HashMap hashMap = new HashMap();
        Map<ProcessId, ClientState> clientStatesMap = AssignmentTestUtils.getClientStatesMap(new ClientState(hashSet, Collections.emptySet(), hashMap, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 3), new ClientState(Collections.emptySet(), Collections.emptySet(), hashMap, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 3), new ClientState(Collections.emptySet(), Collections.emptySet(), hashMap, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 3));
        AssignmentConfigs assignmentConfigs = new AssignmentConfigs(0L, 1, 0, IntegrationTestUtils.DEFAULT_TIMEOUT, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, (OptionalInt) null, (OptionalInt) null, str);
        RackAwareTaskAssignor rackAwareTaskAssignor = AssignmentTestUtils.getRackAwareTaskAssignor(assignmentConfigs, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopologyMetadata.Subtopology(0, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3})), Utils.mkEntry(new TopologyMetadata.Subtopology(1, (String) null), Utils.mkSet(new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2}))}));
        boolean assign = new HighAvailabilityTaskAssignor().assign(clientStatesMap, mkSet, set, rackAwareTaskAssignor, assignmentConfigs);
        AssignmentTestUtils.assertValidAssignment(0, AssignmentTestUtils.EMPTY_TASKS, hashSet, clientStatesMap, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(clientStatesMap, new StringBuilder());
        MatcherAssert.assertThat(Boolean.valueOf(assign), Matchers.is(false));
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, mkSet, clientStatesMap, false, z);
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldAssignRandomInput(String str, boolean z, int i) {
        SortedMap<TaskId, Set<TopicPartition>> taskTopicPartitionMap = AssignmentTestUtils.getTaskTopicPartitionMap(60, 3, false);
        AssignmentConfigs configWithStandbys = getConfigWithStandbys(3, str);
        RackAwareTaskAssignor rackAwareTaskAssignor = (RackAwareTaskAssignor) Mockito.spy(new RackAwareTaskAssignor(AssignmentTestUtils.getRandomCluster(50, 60, 3), taskTopicPartitionMap, AssignmentTestUtils.getTaskTopicPartitionMap(60, 3, true), AssignmentTestUtils.getTasksForTopicGroup(60, 3), AssignmentTestUtils.getRandomProcessRacks(50, 50), AssignmentTestUtils.mockInternalTopicManagerForRandomChangelog(50, 60, 3), configWithStandbys, this.time));
        SortedSet sortedSet = (SortedSet) taskTopicPartitionMap.keySet();
        List<Set<TaskId>> randomSubset = AssignmentTestUtils.getRandomSubset(sortedSet, 2);
        Set<TaskId> set = randomSubset.get(0);
        Set<TaskId> set2 = randomSubset.get(1);
        SortedMap<ProcessId, ClientState> randomClientState = AssignmentTestUtils.getRandomClientState(50, 60, 3, 3, false, set);
        new HighAvailabilityTaskAssignor().assign(randomClientState, sortedSet, set, rackAwareTaskAssignor, configWithStandbys);
        AssignmentTestUtils.assertValidAssignment(3, set, set2, randomClientState, new StringBuilder());
        AssignmentTestUtils.assertBalancedActiveAssignment(randomClientState, new StringBuilder());
        AssignmentTestUtils.verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, sortedSet, randomClientState, true, z);
        if (str.equals("balance_subtopology")) {
            AssignmentTestUtils.assertBalancedTasks(randomClientState, i);
        }
    }

    @MethodSource({"parameter"})
    @ParameterizedTest
    public void shouldRemainOriginalAssignmentWithoutTrafficCostForMinCostStrategy(String str, boolean z, int i) {
        SortedMap<TaskId, Set<TopicPartition>> taskTopicPartitionMap = AssignmentTestUtils.getTaskTopicPartitionMap(60, 3, false);
        Cluster randomCluster = AssignmentTestUtils.getRandomCluster(50, 60, 3);
        SortedMap<TaskId, Set<TopicPartition>> taskTopicPartitionMap2 = AssignmentTestUtils.getTaskTopicPartitionMap(60, 3, true);
        Map<TopologyMetadata.Subtopology, Set<TaskId>> tasksForTopicGroup = AssignmentTestUtils.getTasksForTopicGroup(60, 3);
        Map<ProcessId, Map<String, Optional<String>>> randomProcessRacks = AssignmentTestUtils.getRandomProcessRacks(50, 50);
        InternalTopicManager mockInternalTopicManagerForRandomChangelog = AssignmentTestUtils.mockInternalTopicManagerForRandomChangelog(50, 60, 3);
        AssignmentConfigs assignmentConfigs = new AssignmentConfigs(0L, 1, 1, IntegrationTestUtils.DEFAULT_TIMEOUT, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, 0, 10, str);
        RackAwareTaskAssignor rackAwareTaskAssignor = (RackAwareTaskAssignor) Mockito.spy(new RackAwareTaskAssignor(randomCluster, taskTopicPartitionMap, taskTopicPartitionMap2, tasksForTopicGroup, randomProcessRacks, mockInternalTopicManagerForRandomChangelog, assignmentConfigs, this.time));
        SortedSet sortedSet = (SortedSet) taskTopicPartitionMap.keySet();
        List<Set<TaskId>> randomSubset = AssignmentTestUtils.getRandomSubset(sortedSet, 2);
        Set<TaskId> set = randomSubset.get(0);
        Set<TaskId> set2 = randomSubset.get(1);
        SortedMap<ProcessId, ClientState> randomClientState = AssignmentTestUtils.getRandomClientState(50, 60, 3, 3, false, set);
        new HighAvailabilityTaskAssignor().assign(randomClientState, sortedSet, set, rackAwareTaskAssignor, assignmentConfigs);
        AssignmentTestUtils.assertValidAssignment(1, set, set2, randomClientState, new StringBuilder());
        if (str.equals("none")) {
            return;
        }
        if (str.equals("balance_subtopology")) {
            AssignmentTestUtils.assertBalancedTasks(randomClientState, i);
            return;
        }
        SortedMap<ProcessId, ClientState> copyClientStateMap = AssignmentTestUtils.copyClientStateMap(randomClientState);
        AssignmentConfigs assignmentConfigs2 = new AssignmentConfigs(0L, 1, 1, IntegrationTestUtils.DEFAULT_TIMEOUT, AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS, 0, 10, "none");
        new HighAvailabilityTaskAssignor().assign(copyClientStateMap, sortedSet, set, (RackAwareTaskAssignor) Mockito.spy(new RackAwareTaskAssignor(randomCluster, taskTopicPartitionMap, taskTopicPartitionMap2, tasksForTopicGroup, randomProcessRacks, mockInternalTopicManagerForRandomChangelog, assignmentConfigs2, this.time)), assignmentConfigs2);
        for (Map.Entry<ProcessId, ClientState> entry : randomClientState.entrySet()) {
            MatcherAssert.assertThat(entry.getValue().statefulActiveTasks(), Matchers.equalTo(copyClientStateMap.get(entry.getKey()).statefulActiveTasks()));
            MatcherAssert.assertThat(entry.getValue().standbyTasks(), Matchers.equalTo(copyClientStateMap.get(entry.getKey()).standbyTasks()));
        }
    }

    private static void assertHasNoActiveTasks(ClientState... clientStateArr) {
        for (ClientState clientState : clientStateArr) {
            MatcherAssert.assertThat(clientState.activeTasks(), Matchers.is(Matchers.empty()));
        }
    }

    private static void assertHasNoStandbyTasks(ClientState... clientStateArr) {
        for (ClientState clientState : clientStateArr) {
            MatcherAssert.assertThat(clientState, AssignmentTestUtils.hasStandbyTasks(0));
        }
    }

    private static ClientState getMockClientWithPreviousCaughtUpTasks(Set<TaskId> set, Set<TaskId> set2, ProcessId processId) {
        if (!set2.containsAll(set)) {
            throw new IllegalArgumentException("Need to initialize stateful tasks set before creating mock clients");
        }
        HashMap hashMap = new HashMap();
        for (TaskId taskId : set2) {
            if (set.contains(taskId)) {
                hashMap.put(taskId, 0L);
            } else {
                hashMap.put(taskId, Long.MAX_VALUE);
            }
        }
        return new ClientState(set, Collections.emptySet(), hashMap, AssignmentTestUtils.EMPTY_CLIENT_TAGS, 1, processId);
    }
}
