package org.apache.kafka.connect.storage;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.TopicStatus;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.StrictStubs.class)
/* loaded from: input_file:org/apache/kafka/connect/storage/KafkaStatusBackingStoreFormatTest.class */
public class KafkaStatusBackingStoreFormatTest {
    private static final String STATUS_TOPIC = "status-topic";
    private static final String FOO_TOPIC = "foo-topic";
    private static final String FOO_CONNECTOR = "foo-source";
    private static final String BAR_TOPIC = "bar-topic";
    private Time time;
    private KafkaStatusBackingStore store;
    private final KafkaBasedLog<String, byte[]> kafkaBasedLog = (KafkaBasedLog) Mockito.mock(KafkaBasedLog.class);

    @Before
    public void setup() {
        this.time = new MockTime();
        JsonConverter jsonConverter = new JsonConverter();
        jsonConverter.configure(Collections.singletonMap("schemas.enable", false), false);
        this.store = new KafkaStatusBackingStore(new MockTime(), jsonConverter, STATUS_TOPIC, () -> {
            return null;
        }, this.kafkaBasedLog);
    }

    @Test
    public void readInvalidStatus() {
        byte[] bArr = new byte[0];
        ConsumerRecord consumerRecord = new ConsumerRecord(STATUS_TOPIC, 0, 0L, "status-unknown", bArr);
        Assert.assertTrue(this.store.connectors().isEmpty());
        Assert.assertTrue(this.store.tasks.isEmpty());
        Assert.assertTrue(this.store.topics.isEmpty());
        this.store.read(consumerRecord);
        Assert.assertTrue(this.store.connectors().isEmpty());
        Assert.assertTrue(this.store.tasks.isEmpty());
        Assert.assertTrue(this.store.topics.isEmpty());
        ConsumerRecord consumerRecord2 = new ConsumerRecord(STATUS_TOPIC, 0, 0L, "status-connector-", bArr);
        Assert.assertTrue(this.store.connectors().isEmpty());
        this.store.read(consumerRecord2);
        Assert.assertTrue(this.store.connectors().isEmpty());
        ConsumerRecord consumerRecord3 = new ConsumerRecord(STATUS_TOPIC, 0, 0L, "status-task-", bArr);
        Assert.assertTrue(this.store.tasks.isEmpty());
        this.store.read(consumerRecord3);
        Assert.assertTrue(this.store.tasks.isEmpty());
        ConsumerRecord consumerRecord4 = new ConsumerRecord(STATUS_TOPIC, 0, 0L, "status-task-foo-source-#", bArr);
        Assert.assertTrue(this.store.tasks.isEmpty());
        this.store.read(consumerRecord4);
        Assert.assertTrue(this.store.tasks.isEmpty());
        ConsumerRecord consumerRecord5 = new ConsumerRecord(STATUS_TOPIC, 0, 0L, "status-topic-", bArr);
        Assert.assertTrue(this.store.topics.isEmpty());
        this.store.read(consumerRecord5);
        Assert.assertTrue(this.store.topics.isEmpty());
        ConsumerRecord consumerRecord6 = new ConsumerRecord(STATUS_TOPIC, 0, 0L, "status-topic-:connector-", bArr);
        Assert.assertTrue(this.store.topics.isEmpty());
        this.store.read(consumerRecord6);
        Assert.assertTrue(this.store.topics.isEmpty());
        ConsumerRecord consumerRecord7 = new ConsumerRecord(STATUS_TOPIC, 0, 0L, "status-topic-foo-topic:", bArr);
        Assert.assertTrue(this.store.topics.isEmpty());
        this.store.read(consumerRecord7);
        Assert.assertTrue(this.store.topics.isEmpty());
        ConsumerRecord consumerRecord8 = new ConsumerRecord(STATUS_TOPIC, 0, 0L, "status-topic-foo-topic:connector-", bArr);
        Assert.assertTrue(this.store.topics.isEmpty());
        this.store.read(consumerRecord8);
        Assert.assertTrue(this.store.topics.isEmpty());
    }

    @Test
    public void readInvalidStatusValue() {
        byte[] bytes = "invalid".getBytes();
        ConsumerRecord consumerRecord = new ConsumerRecord(STATUS_TOPIC, 0, 0L, "status-connector-foo-source", bytes);
        Assert.assertTrue(this.store.connectors().isEmpty());
        this.store.read(consumerRecord);
        Assert.assertTrue(this.store.connectors().isEmpty());
        ConsumerRecord consumerRecord2 = new ConsumerRecord(STATUS_TOPIC, 0, 0L, "status-task-foo-source-0", bytes);
        Assert.assertTrue(this.store.tasks.isEmpty());
        this.store.read(consumerRecord2);
        Assert.assertTrue(this.store.tasks.isEmpty());
        ConsumerRecord consumerRecord3 = new ConsumerRecord(STATUS_TOPIC, 0, 0L, "status-topic-foo-topic:connector-foo-source", bytes);
        Assert.assertTrue(this.store.topics.isEmpty());
        this.store.read(consumerRecord3);
        Assert.assertTrue(this.store.topics.isEmpty());
    }

    @Test
    public void readTopicStatus() {
        TopicStatus topicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR, 0), Time.SYSTEM.milliseconds());
        this.store.read(new ConsumerRecord(STATUS_TOPIC, 0, 0L, "status-topic-foo-topic:connector-foo-source", this.store.serializeTopicStatus(topicStatus)));
        Assert.assertTrue(this.store.topics.containsKey(FOO_CONNECTOR));
        Assert.assertTrue(((ConcurrentMap) this.store.topics.get(FOO_CONNECTOR)).containsKey(FOO_TOPIC));
        Assert.assertEquals(topicStatus, ((ConcurrentMap) this.store.topics.get(FOO_CONNECTOR)).get(FOO_TOPIC));
    }

    @Test
    public void deleteTopicStatus() {
        TopicStatus topicStatus = new TopicStatus("foo", new ConnectorTaskId("bar", 0), Time.SYSTEM.milliseconds());
        ((ConcurrentMap) this.store.topics.computeIfAbsent("bar", str -> {
            return new ConcurrentHashMap();
        })).put("foo", topicStatus);
        Assert.assertTrue(this.store.topics.containsKey("bar"));
        Assert.assertTrue(((ConcurrentMap) this.store.topics.get("bar")).containsKey("foo"));
        Assert.assertEquals(topicStatus, ((ConcurrentMap) this.store.topics.get("bar")).get("foo"));
        this.store.read(new ConsumerRecord(STATUS_TOPIC, 0, 0L, "status-topic-foo:connector-bar", this.store.serializeTopicStatus((TopicStatus) null)));
        Assert.assertTrue(this.store.topics.containsKey("bar"));
        Assert.assertFalse(((ConcurrentMap) this.store.topics.get("bar")).containsKey("foo"));
        Assert.assertEquals(Collections.emptyMap(), this.store.topics.get("bar"));
    }

    @Test
    public void putTopicState() {
        TopicStatus topicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR, 0), this.time.milliseconds());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(byte[].class);
        ((KafkaBasedLog) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) invocationOnMock.getArgument(2)).onCompletion((RecordMetadata) null, (Exception) null);
            return null;
        }).when(this.kafkaBasedLog)).send(Mockito.eq("status-topic-foo-topic:connector-foo-source"), forClass.capture(), (Callback) Mockito.any(Callback.class));
        this.store.put(topicStatus);
        Assert.assertEquals(topicStatus, this.store.parseTopicStatus((byte[]) forClass.getValue()));
        Assert.assertNull(this.store.getTopic(FOO_CONNECTOR, FOO_TOPIC));
        this.store.read(new ConsumerRecord(STATUS_TOPIC, 0, 0L, "status-topic-foo-topic:connector-foo-source", forClass.getValue()));
        Assert.assertEquals(topicStatus, this.store.getTopic(FOO_CONNECTOR, FOO_TOPIC));
        Assert.assertEquals(Collections.singleton(topicStatus), new HashSet(this.store.getAllTopics(FOO_CONNECTOR)));
    }

    @Test
    public void putTopicStateRetriableFailure() {
        TopicStatus topicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR, 0), this.time.milliseconds());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(byte[].class);
        ((KafkaBasedLog) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) invocationOnMock.getArgument(2)).onCompletion((RecordMetadata) null, new TimeoutException());
            return null;
        }).doAnswer(invocationOnMock2 -> {
            ((Callback) invocationOnMock2.getArgument(2)).onCompletion((RecordMetadata) null, (Exception) null);
            return null;
        }).when(this.kafkaBasedLog)).send(Mockito.eq("status-topic-foo-topic:connector-foo-source"), forClass.capture(), (Callback) Mockito.any(Callback.class));
        this.store.put(topicStatus);
        ((KafkaBasedLog) Mockito.verify(this.kafkaBasedLog, Mockito.timeout(1000L).times(2))).send(Mockito.any(), Mockito.any(), (Callback) Mockito.any());
        Assert.assertEquals(topicStatus, this.store.parseTopicStatus((byte[]) forClass.getValue()));
        Assert.assertNull(this.store.getTopic(FOO_CONNECTOR, FOO_TOPIC));
    }

    @Test
    public void putTopicStateNonRetriableFailure() {
        TopicStatus topicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR, 0), this.time.milliseconds());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(byte[].class);
        ((KafkaBasedLog) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) invocationOnMock.getArgument(2)).onCompletion((RecordMetadata) null, new UnknownServerException());
            return null;
        }).when(this.kafkaBasedLog)).send(Mockito.eq("status-topic-foo-topic:connector-foo-source"), forClass.capture(), (Callback) Mockito.any(Callback.class));
        this.store.put(topicStatus);
        Assert.assertEquals(topicStatus, this.store.parseTopicStatus((byte[]) forClass.getValue()));
        Assert.assertNull(this.store.getTopic(FOO_CONNECTOR, FOO_TOPIC));
    }

    @Test
    public void putTopicStateShouldOverridePreviousState() {
        TopicStatus topicStatus = new TopicStatus(FOO_TOPIC, new ConnectorTaskId(FOO_CONNECTOR, 0), this.time.milliseconds());
        this.time.sleep(1000L);
        TopicStatus topicStatus2 = new TopicStatus(BAR_TOPIC, new ConnectorTaskId(FOO_CONNECTOR, 0), this.time.milliseconds());
        String str = "status-topic-bar-topic:connector-foo-source";
        ArgumentCaptor forClass = ArgumentCaptor.forClass(byte[].class);
        ((KafkaBasedLog) Mockito.doAnswer(invocationOnMock -> {
            ((Callback) invocationOnMock.getArgument(2)).onCompletion((RecordMetadata) null, (Exception) null);
            this.store.read(new ConsumerRecord(STATUS_TOPIC, 0, 0L, str, forClass.getValue()));
            return null;
        }).when(this.kafkaBasedLog)).send(Mockito.eq("status-topic-bar-topic:connector-foo-source"), forClass.capture(), (Callback) Mockito.any(Callback.class));
        this.store.read(new ConsumerRecord(STATUS_TOPIC, 0, 0L, "status-topic-foo-topic:connector-foo-source", this.store.serializeTopicStatus(topicStatus)));
        this.store.put(topicStatus2);
        Assert.assertEquals(topicStatus2, this.store.parseTopicStatus((byte[]) forClass.getValue()));
        Assert.assertEquals(topicStatus, this.store.getTopic(FOO_CONNECTOR, FOO_TOPIC));
        Assert.assertEquals(topicStatus2, this.store.getTopic(FOO_CONNECTOR, BAR_TOPIC));
        Assert.assertEquals(new HashSet(Arrays.asList(topicStatus, topicStatus2)), new HashSet(this.store.getAllTopics(FOO_CONNECTOR)));
    }
}
