package org.apache.kafka.tools;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AbortTransactionResult;
import org.apache.kafka.clients.admin.AbortTransactionSpec;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeProducersOptions;
import org.apache.kafka.clients.admin.DescribeProducersResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.DescribeTransactionsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.ListTransactionsOptions;
import org.apache.kafka.clients.admin.ListTransactionsResult;
import org.apache.kafka.clients.admin.ProducerState;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TransactionDescription;
import org.apache.kafka.clients.admin.TransactionListing;
import org.apache.kafka.clients.admin.TransactionState;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.tools.ToolsTestUtils;
import org.apache.kafka.tools.TransactionsCommand;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/tools/TransactionsCommandTest.class */
public class TransactionsCommandTest {
    private final ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure();
    private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    private final PrintStream out = new PrintStream(this.outputStream);
    private final MockTime time = new MockTime();
    private final Admin admin = (Admin) Mockito.mock(Admin.class);

    @BeforeEach
    public void setupExitProcedure() {
        Exit.setExitProcedure(this.exitProcedure);
    }

    @AfterEach
    public void resetExitProcedure() {
        Exit.resetExitProcedure();
    }

    @Test
    public void testDescribeProducersTopicRequired() throws Exception {
        assertCommandFailure(new String[]{"--bootstrap-server", "localhost:9092", "describe-producers", "--partition", "0"});
    }

    @Test
    public void testDescribeProducersPartitionRequired() throws Exception {
        assertCommandFailure(new String[]{"--bootstrap-server", "localhost:9092", "describe-producers", "--topic", "foo"});
    }

    @Test
    public void testDescribeProducersLeader() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 5);
        testDescribeProducers(topicPartition, new String[]{"--bootstrap-server", "localhost:9092", "describe-producers", "--topic", topicPartition.topic(), "--partition", String.valueOf(topicPartition.partition())}, new DescribeProducersOptions());
    }

    @Test
    public void testDescribeProducersSpecificReplica() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 5);
        testDescribeProducers(topicPartition, new String[]{"--bootstrap-server", "localhost:9092", "describe-producers", "--topic", topicPartition.topic(), "--partition", String.valueOf(topicPartition.partition()), "--broker-id", String.valueOf(5)}, new DescribeProducersOptions().brokerId(5));
    }

    private void testDescribeProducers(TopicPartition topicPartition, String[] strArr, DescribeProducersOptions describeProducersOptions) throws Exception {
        DescribeProducersResult describeProducersResult = (DescribeProducersResult) Mockito.mock(DescribeProducersResult.class);
        Mockito.when(describeProducersResult.partitionResult(topicPartition)).thenReturn(KafkaFuture.completedFuture(new DescribeProducersResult.PartitionProducerState(Arrays.asList(new ProducerState(12345L, 15, 1300, 1599509565L, OptionalInt.of(20), OptionalLong.of(990L)), new ProducerState(98765L, 30, 2300, 1599509599L, OptionalInt.empty(), OptionalLong.empty())))));
        Mockito.when(this.admin.describeProducers(Collections.singleton(topicPartition), describeProducersOptions)).thenReturn(describeProducersResult);
        execute(strArr);
        assertNormalExit();
        List<List<String>> readOutputAsTable = readOutputAsTable();
        Assertions.assertEquals(3, readOutputAsTable.size());
        Assertions.assertEquals(TransactionsCommand.DescribeProducersCommand.HEADERS, readOutputAsTable.get(0));
        Assertions.assertEquals(Set.of(Arrays.asList("12345", "15", "20", "1300", "1599509565", "990"), Arrays.asList("98765", "30", "-1", "2300", "1599509599", "None")), new HashSet(readOutputAsTable.subList(1, readOutputAsTable.size())));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testListTransactions(boolean z) throws Exception {
        String[] strArr = {"--bootstrap-server", "localhost:9092", "list"};
        if (z) {
            strArr = new String[]{"--bootstrap-server", "localhost:9092", "list", "--duration-filter", Long.toString(Long.MAX_VALUE)};
        }
        HashMap hashMap = new HashMap();
        hashMap.put(0, Arrays.asList(new TransactionListing("foo", 12345L, TransactionState.ONGOING), new TransactionListing("bar", 98765L, TransactionState.PREPARE_ABORT)));
        hashMap.put(1, Collections.singletonList(new TransactionListing("baz", 13579L, TransactionState.COMPLETE_COMMIT)));
        if (z) {
            expectListTransactions(new ListTransactionsOptions().filterOnDuration(Long.MAX_VALUE), hashMap);
        } else {
            expectListTransactions(hashMap);
        }
        execute(strArr);
        assertNormalExit();
        List<List<String>> readOutputAsTable = readOutputAsTable();
        Assertions.assertEquals(4, readOutputAsTable.size());
        Assertions.assertEquals(TransactionsCommand.ListTransactionsCommand.HEADERS, readOutputAsTable.get(0));
        Assertions.assertEquals(Set.of(Arrays.asList("foo", "0", "12345", "Ongoing"), Arrays.asList("bar", "0", "98765", "PrepareAbort"), Arrays.asList("baz", "1", "13579", "CompleteCommit")), new HashSet(readOutputAsTable.subList(1, readOutputAsTable.size())));
    }

    @Test
    public void testDescribeTransactionsTransactionalIdRequired() throws Exception {
        assertCommandFailure(new String[]{"--bootstrap-server", "localhost:9092", "describe"});
    }

    @Test
    public void testDescribeTransaction() throws Exception {
        DescribeTransactionsResult describeTransactionsResult = (DescribeTransactionsResult) Mockito.mock(DescribeTransactionsResult.class);
        long milliseconds = this.time.milliseconds();
        Mockito.when(describeTransactionsResult.description("foo")).thenReturn(KafkaFuture.completedFuture(new TransactionDescription(5, TransactionState.ONGOING, 12345L, 15, 10000L, OptionalLong.of(milliseconds), Collections.singleton(new TopicPartition("bar", 0)))));
        Mockito.when(this.admin.describeTransactions(Collections.singleton("foo"))).thenReturn(describeTransactionsResult);
        this.time.sleep(5000L);
        execute(new String[]{"--bootstrap-server", "localhost:9092", "describe", "--transactional-id", "foo"});
        assertNormalExit();
        List<List<String>> readOutputAsTable = readOutputAsTable();
        Assertions.assertEquals(2, readOutputAsTable.size());
        Assertions.assertEquals(TransactionsCommand.DescribeTransactionsCommand.HEADERS, readOutputAsTable.get(0));
        Assertions.assertEquals(Arrays.asList(String.valueOf(5), "foo", "12345", "15", "Ongoing", "10000", String.valueOf(milliseconds), "5000", "bar-0"), readOutputAsTable.get(1));
    }

    @Test
    public void testDescribeTransactionsStartOffsetOrProducerIdRequired() throws Exception {
        assertCommandFailure(new String[]{"--bootstrap-server", "localhost:9092", "abort", "--topic", "foo", "--partition", "0"});
    }

    @Test
    public void testDescribeTransactionsTopicRequired() throws Exception {
        assertCommandFailure(new String[]{"--bootstrap-server", "localhost:9092", "abort", "--partition", "0", "--start-offset", "9990"});
    }

    @Test
    public void testDescribeTransactionsPartitionRequired() throws Exception {
        assertCommandFailure(new String[]{"--bootstrap-server", "localhost:9092", "abort", "--topic", "foo", "--start-offset", "9990"});
    }

    @Test
    public void testDescribeTransactionsProducerEpochRequiredWithProducerId() throws Exception {
        assertCommandFailure(new String[]{"--bootstrap-server", "localhost:9092", "abort", "--topic", "foo", "--partition", "0", "--producer-id", "12345"});
    }

    @Test
    public void testDescribeTransactionsCoordinatorEpochRequiredWithProducerId() throws Exception {
        assertCommandFailure(new String[]{"--bootstrap-server", "localhost:9092", "abort", "--topic", "foo", "--partition", "0", "--producer-id", "12345", "--producer-epoch", "15"});
    }

    @Test
    public void testNewBrokerAbortTransaction() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 5);
        String[] strArr = {"--bootstrap-server", "localhost:9092", "abort", "--topic", topicPartition.topic(), "--partition", String.valueOf(topicPartition.partition()), "--start-offset", String.valueOf(9173L)};
        DescribeProducersResult describeProducersResult = (DescribeProducersResult) Mockito.mock(DescribeProducersResult.class);
        KafkaFuture completedFuture = KafkaFuture.completedFuture(new DescribeProducersResult.PartitionProducerState(Collections.singletonList(new ProducerState(12345L, 15, 1300, 1599509565L, OptionalInt.of(76), OptionalLong.of(9173L)))));
        AbortTransactionResult abortTransactionResult = (AbortTransactionResult) Mockito.mock(AbortTransactionResult.class);
        KafkaFuture completedFuture2 = KafkaFuture.completedFuture((Object) null);
        AbortTransactionSpec abortTransactionSpec = new AbortTransactionSpec(topicPartition, 12345L, (short) 15, 76);
        Mockito.when(describeProducersResult.partitionResult(topicPartition)).thenReturn(completedFuture);
        Mockito.when(this.admin.describeProducers(Collections.singleton(topicPartition))).thenReturn(describeProducersResult);
        Mockito.when(abortTransactionResult.all()).thenReturn(completedFuture2);
        Mockito.when(this.admin.abortTransaction(abortTransactionSpec)).thenReturn(abortTransactionResult);
        execute(strArr);
        assertNormalExit();
    }

    @ValueSource(ints = {29, -1})
    @ParameterizedTest
    public void testOldBrokerAbortTransactionWithUnknownCoordinatorEpoch(int i) throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 5);
        String[] strArr = {"--bootstrap-server", "localhost:9092", "abort", "--topic", topicPartition.topic(), "--partition", String.valueOf(topicPartition.partition()), "--producer-id", String.valueOf(12345L), "--producer-epoch", String.valueOf(15), "--coordinator-epoch", String.valueOf(i)};
        AbortTransactionResult abortTransactionResult = (AbortTransactionResult) Mockito.mock(AbortTransactionResult.class);
        KafkaFuture completedFuture = KafkaFuture.completedFuture((Object) null);
        AbortTransactionSpec abortTransactionSpec = new AbortTransactionSpec(topicPartition, 12345L, (short) 15, Math.max(i, 0));
        Mockito.when(abortTransactionResult.all()).thenReturn(completedFuture);
        Mockito.when(this.admin.abortTransaction(abortTransactionSpec)).thenReturn(abortTransactionResult);
        execute(strArr);
        assertNormalExit();
    }

    @Test
    public void testFindHangingRequiresEitherBrokerIdOrTopic() throws Exception {
        assertCommandFailure(new String[]{"--bootstrap-server", "localhost:9092", "find-hanging"});
    }

    @Test
    public void testFindHangingRequiresTopicIfPartitionIsSpecified() throws Exception {
        assertCommandFailure(new String[]{"--bootstrap-server", "localhost:9092", "find-hanging", "--broker-id", "0", "--partition", "5"});
    }

    private void expectListTransactions(Map<Integer, Collection<TransactionListing>> map) {
        expectListTransactions(new ListTransactionsOptions(), map);
    }

    private void expectListTransactions(ListTransactionsOptions listTransactionsOptions, Map<Integer, Collection<TransactionListing>> map) {
        ListTransactionsResult listTransactionsResult = (ListTransactionsResult) Mockito.mock(ListTransactionsResult.class);
        Mockito.when(this.admin.listTransactions(listTransactionsOptions)).thenReturn(listTransactionsResult);
        ArrayList arrayList = new ArrayList();
        Collection<Collection<TransactionListing>> values = map.values();
        Objects.requireNonNull(arrayList);
        values.forEach(arrayList::addAll);
        Mockito.when(listTransactionsResult.all()).thenReturn(KafkaFuture.completedFuture(arrayList));
        Mockito.when(listTransactionsResult.allByBrokerId()).thenReturn(KafkaFuture.completedFuture(map));
    }

    private void expectDescribeProducers(TopicPartition topicPartition, long j, short s, long j2, OptionalInt optionalInt, OptionalLong optionalLong) {
        DescribeProducersResult.PartitionProducerState partitionProducerState = new DescribeProducersResult.PartitionProducerState(Collections.singletonList(new ProducerState(j, s, 500, j2, optionalInt, optionalLong)));
        DescribeProducersResult describeProducersResult = (DescribeProducersResult) Mockito.mock(DescribeProducersResult.class);
        Mockito.when(describeProducersResult.all()).thenReturn(KafkaFuture.completedFuture(Collections.singletonMap(topicPartition, partitionProducerState)));
        Mockito.when(this.admin.describeProducers(Collections.singletonList(topicPartition), new DescribeProducersOptions())).thenReturn(describeProducersResult);
    }

    private void expectDescribeTransactions(Map<String, TransactionDescription> map) {
        DescribeTransactionsResult describeTransactionsResult = (DescribeTransactionsResult) Mockito.mock(DescribeTransactionsResult.class);
        map.forEach((str, transactionDescription) -> {
            Mockito.when(describeTransactionsResult.description(str)).thenReturn(KafkaFuture.completedFuture(transactionDescription));
        });
        Mockito.when(describeTransactionsResult.all()).thenReturn(KafkaFuture.completedFuture(map));
        Mockito.when(this.admin.describeTransactions(map.keySet())).thenReturn(describeTransactionsResult);
    }

    private void expectListTopics(Set<String> set) {
        ListTopicsResult listTopicsResult = (ListTopicsResult) Mockito.mock(ListTopicsResult.class);
        Mockito.when(listTopicsResult.names()).thenReturn(KafkaFuture.completedFuture(set));
        Mockito.when(this.admin.listTopics(new ListTopicsOptions().listInternal(true))).thenReturn(listTopicsResult);
    }

    private void expectDescribeTopics(Map<String, TopicDescription> map) {
        DescribeTopicsResult describeTopicsResult = (DescribeTopicsResult) Mockito.mock(DescribeTopicsResult.class);
        Mockito.when(describeTopicsResult.allTopicNames()).thenReturn(KafkaFuture.completedFuture(map));
        Mockito.when(this.admin.describeTopics(new ArrayList(map.keySet()))).thenReturn(describeTopicsResult);
    }

    @Test
    public void testFindHangingLookupTopicPartitionsForBroker() throws Exception {
        String[] strArr = {"--bootstrap-server", "localhost:9092", "find-hanging", "--broker-id", String.valueOf(5)};
        expectListTopics(Collections.singleton("foo"));
        Node node = new Node(0, "localhost", 9092);
        Node node2 = new Node(1, "localhost", 9093);
        Node node3 = new Node(5, "localhost", 9097);
        expectDescribeTopics(Collections.singletonMap("foo", new TopicDescription("foo", false, Arrays.asList(new TopicPartitionInfo(0, node, Arrays.asList(node, node2), Arrays.asList(node, node2)), new TopicPartitionInfo(1, node2, Arrays.asList(node2, node3), Arrays.asList(node2, node3))))));
        DescribeProducersResult describeProducersResult = (DescribeProducersResult) Mockito.mock(DescribeProducersResult.class);
        Mockito.when(describeProducersResult.all()).thenReturn(KafkaFuture.completedFuture(Collections.emptyMap()));
        Mockito.when(this.admin.describeProducers(Collections.singletonList(new TopicPartition("foo", 1)), new DescribeProducersOptions().brokerId(5))).thenReturn(describeProducersResult);
        execute(strArr);
        assertNormalExit();
        assertNoHangingTransactions();
    }

    @Test
    public void testFindHangingLookupTopicAndBrokerId() throws Exception {
        String[] strArr = {"--bootstrap-server", "localhost:9092", "find-hanging", "--broker-id", String.valueOf(5), "--topic", "foo"};
        Node node = new Node(0, "localhost", 9092);
        Node node2 = new Node(1, "localhost", 9093);
        Node node3 = new Node(5, "localhost", 9097);
        expectDescribeTopics(Collections.singletonMap("foo", new TopicDescription("foo", false, Arrays.asList(new TopicPartitionInfo(0, node, Arrays.asList(node, node2), Arrays.asList(node, node2)), new TopicPartitionInfo(1, node2, Arrays.asList(node2, node3), Arrays.asList(node2, node3))))));
        DescribeProducersResult describeProducersResult = (DescribeProducersResult) Mockito.mock(DescribeProducersResult.class);
        Mockito.when(describeProducersResult.all()).thenReturn(KafkaFuture.completedFuture(Collections.emptyMap()));
        Mockito.when(this.admin.describeProducers(Collections.singletonList(new TopicPartition("foo", 1)), new DescribeProducersOptions().brokerId(5))).thenReturn(describeProducersResult);
        execute(strArr);
        assertNormalExit();
        assertNoHangingTransactions();
    }

    @Test
    public void testFindHangingLookupTopicPartitionsForTopic() throws Exception {
        Node node = new Node(0, "localhost", 9092);
        Node node2 = new Node(1, "localhost", 9093);
        Node node3 = new Node(5, "localhost", 9097);
        expectDescribeTopics(Collections.singletonMap("foo", new TopicDescription("foo", false, Arrays.asList(new TopicPartitionInfo(0, node, Arrays.asList(node, node2), Arrays.asList(node, node2)), new TopicPartitionInfo(1, node2, Arrays.asList(node2, node3), Arrays.asList(node2, node3))))));
        DescribeProducersResult describeProducersResult = (DescribeProducersResult) Mockito.mock(DescribeProducersResult.class);
        Mockito.when(describeProducersResult.all()).thenReturn(KafkaFuture.completedFuture(Collections.emptyMap()));
        Mockito.when(this.admin.describeProducers(Arrays.asList(new TopicPartition("foo", 0), new TopicPartition("foo", 1)), new DescribeProducersOptions())).thenReturn(describeProducersResult);
        execute(new String[]{"--bootstrap-server", "localhost:9092", "find-hanging", "--topic", "foo"});
        assertNormalExit();
        assertNoHangingTransactions();
    }

    private void assertNoHangingTransactions() throws Exception {
        List<List<String>> readOutputAsTable = readOutputAsTable();
        Assertions.assertEquals(1, readOutputAsTable.size());
        Assertions.assertEquals(TransactionsCommand.FindHangingTransactionsCommand.HEADERS, readOutputAsTable.get(0));
    }

    @Test
    public void testFindHangingSpecifiedTopicPartition() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 5);
        String[] strArr = {"--bootstrap-server", "localhost:9092", "find-hanging", "--topic", topicPartition.topic(), "--partition", String.valueOf(topicPartition.partition())};
        expectDescribeProducers(topicPartition, 132L, (short) 5, this.time.milliseconds(), OptionalInt.of(19), OptionalLong.of(29384L));
        execute(strArr);
        assertNormalExit();
        List<List<String>> readOutputAsTable = readOutputAsTable();
        Assertions.assertEquals(1, readOutputAsTable.size());
        Assertions.assertEquals(TransactionsCommand.FindHangingTransactionsCommand.HEADERS, readOutputAsTable.get(0));
    }

    @Test
    public void testFindHangingNoMappedTransactionalId() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 5);
        String[] strArr = {"--bootstrap-server", "localhost:9092", "find-hanging", "--topic", topicPartition.topic(), "--partition", String.valueOf(topicPartition.partition())};
        long milliseconds = this.time.milliseconds() - TimeUnit.MINUTES.toMillis(60L);
        expectDescribeProducers(topicPartition, 132L, (short) 5, milliseconds, OptionalInt.of(19), OptionalLong.of(29384L));
        expectListTransactions(new ListTransactionsOptions().filterProducerIds(Collections.singleton(132L)), Collections.singletonMap(1, Collections.emptyList()));
        expectDescribeTransactions(Collections.emptyMap());
        execute(strArr);
        assertNormalExit();
        assertHangingTransaction(topicPartition, 132L, (short) 5, 19, 29384L, milliseconds);
    }

    @Test
    public void testFindHangingWithNoTransactionDescription() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 5);
        String[] strArr = {"--bootstrap-server", "localhost:9092", "find-hanging", "--topic", topicPartition.topic(), "--partition", String.valueOf(topicPartition.partition())};
        long milliseconds = this.time.milliseconds() - TimeUnit.MINUTES.toMillis(60L);
        expectDescribeProducers(topicPartition, 132L, (short) 5, milliseconds, OptionalInt.of(19), OptionalLong.of(29384L));
        expectListTransactions(new ListTransactionsOptions().filterProducerIds(Collections.singleton(132L)), Collections.singletonMap(1, Collections.singletonList(new TransactionListing("bar", 132L, TransactionState.ONGOING))));
        DescribeTransactionsResult describeTransactionsResult = (DescribeTransactionsResult) Mockito.mock(DescribeTransactionsResult.class);
        Mockito.when(describeTransactionsResult.description("bar")).thenReturn(failedFuture(new TransactionalIdNotFoundException("bar" + " not found")));
        Mockito.when(this.admin.describeTransactions(Collections.singleton("bar"))).thenReturn(describeTransactionsResult);
        execute(strArr);
        assertNormalExit();
        assertHangingTransaction(topicPartition, 132L, (short) 5, 19, 29384L, milliseconds);
    }

    private <T> KafkaFuture<T> failedFuture(Exception exc) {
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(exc);
        return kafkaFutureImpl;
    }

    @Test
    public void testFindHangingDoesNotFilterByTransactionInProgressWithDifferentPartitions() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 5);
        String[] strArr = {"--bootstrap-server", "localhost:9092", "find-hanging", "--topic", topicPartition.topic(), "--partition", String.valueOf(topicPartition.partition())};
        long milliseconds = this.time.milliseconds() - TimeUnit.MINUTES.toMillis(60L);
        expectDescribeProducers(topicPartition, 132L, (short) 5, milliseconds, OptionalInt.of(19), OptionalLong.of(29384L));
        expectListTransactions(new ListTransactionsOptions().filterProducerIds(Collections.singleton(132L)), Collections.singletonMap(1, Collections.singletonList(new TransactionListing("bar", 132L, TransactionState.ONGOING))));
        expectDescribeTransactions(Collections.singletonMap("bar", new TransactionDescription(1, TransactionState.ONGOING, 132L, 5, 60000L, OptionalLong.of(this.time.milliseconds()), Collections.singleton(new TopicPartition("foo", 10)))));
        execute(strArr);
        assertNormalExit();
        assertHangingTransaction(topicPartition, 132L, (short) 5, 19, 29384L, milliseconds);
    }

    private void assertHangingTransaction(TopicPartition topicPartition, long j, short s, int i, long j2, long j3) throws Exception {
        List<List<String>> readOutputAsTable = readOutputAsTable();
        Assertions.assertEquals(2, readOutputAsTable.size());
        Assertions.assertEquals(TransactionsCommand.FindHangingTransactionsCommand.HEADERS, readOutputAsTable.get(0));
        Assertions.assertEquals(Arrays.asList(topicPartition.topic(), String.valueOf(topicPartition.partition()), String.valueOf(j), String.valueOf((int) s), String.valueOf(i), String.valueOf(j2), String.valueOf(j3), String.valueOf(TimeUnit.MILLISECONDS.toMinutes(this.time.milliseconds() - j3))), readOutputAsTable.get(1));
    }

    @Test
    public void testFindHangingFilterByTransactionInProgressWithSamePartition() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 5);
        String[] strArr = {"--bootstrap-server", "localhost:9092", "find-hanging", "--topic", topicPartition.topic(), "--partition", String.valueOf(topicPartition.partition())};
        long milliseconds = this.time.milliseconds() - TimeUnit.MINUTES.toMillis(60L);
        expectDescribeProducers(topicPartition, 132L, (short) 5, milliseconds, OptionalInt.of(19), OptionalLong.of(29384L));
        expectListTransactions(new ListTransactionsOptions().filterProducerIds(Collections.singleton(132L)), Collections.singletonMap(1, Collections.singletonList(new TransactionListing("bar", 132L, TransactionState.ONGOING))));
        expectDescribeTransactions(Collections.singletonMap("bar", new TransactionDescription(1, TransactionState.ONGOING, 132L, 5, 60000L, OptionalLong.of(milliseconds), Collections.singleton(topicPartition))));
        execute(strArr);
        assertNormalExit();
        assertNoHangingTransactions();
    }

    private void execute(String[] strArr) throws Exception {
        TransactionsCommand.execute(strArr, namespace -> {
            return this.admin;
        }, this.out, this.time);
    }

    private List<List<String>> readOutputAsTable() throws IOException {
        ArrayList arrayList = new ArrayList();
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(this.outputStream.toByteArray())));
        while (true) {
            List<String> readRow = readRow(bufferedReader);
            if (readRow == null) {
                return arrayList;
            }
            arrayList.add(readRow);
        }
    }

    private List<String> readRow(BufferedReader bufferedReader) throws IOException {
        String readLine = bufferedReader.readLine();
        if (readLine == null) {
            return null;
        }
        return Arrays.asList(readLine.split("\\s+"));
    }

    private void assertNormalExit() {
        Assertions.assertTrue(this.exitProcedure.hasExited());
        Assertions.assertEquals(0, this.exitProcedure.statusCode());
    }

    private void assertCommandFailure(String[] strArr) throws Exception {
        execute(strArr);
        Assertions.assertTrue(this.exitProcedure.hasExited());
        Assertions.assertEquals(1, this.exitProcedure.statusCode());
    }
}
