package org.apache.kafka.common.telemetry.internals;

import io.opentelemetry.proto.common.v1.KeyValue;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.message.PushTelemetryRequestData;
import org.apache.kafka.common.message.PushTelemetryResponseData;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsRequest;
import org.apache.kafka.common.requests.GetTelemetrySubscriptionsResponse;
import org.apache.kafka.common.requests.PushTelemetryRequest;
import org.apache.kafka.common.requests.PushTelemetryResponse;
import org.apache.kafka.common.telemetry.ClientTelemetryState;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.utils.MockTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/common/telemetry/internals/ClientTelemetryReporterTest.class */
public class ClientTelemetryReporterTest {
    private MockTime time;
    private ClientTelemetryReporter clientTelemetryReporter;
    private Map<String, Object> configs;
    private MetricsContext metricsContext;
    private Uuid uuid;
    private ClientTelemetryReporter.ClientTelemetrySubscription subscription;

    @BeforeEach
    public void setUp() {
        this.time = new MockTime();
        this.clientTelemetryReporter = new ClientTelemetryReporter(this.time);
        this.configs = new HashMap();
        this.metricsContext = new KafkaMetricsContext("test");
        this.uuid = Uuid.randomUuid();
        this.subscription = new ClientTelemetryReporter.ClientTelemetrySubscription(this.uuid, 1234, 20000, Collections.emptyList(), true, (Predicate) null);
    }

    @Test
    public void testInitTelemetryReporter() {
        this.configs.put("client.id", "test-client");
        this.configs.put("client.rack", "rack");
        this.clientTelemetryReporter.configure(this.configs);
        this.clientTelemetryReporter.contextChange(this.metricsContext);
        Assertions.assertNotNull(this.clientTelemetryReporter.metricsCollector());
        Assertions.assertNotNull(this.clientTelemetryReporter.telemetryProvider().resource());
        Assertions.assertEquals(1, this.clientTelemetryReporter.telemetryProvider().resource().getAttributesCount());
        Assertions.assertEquals("client_rack", this.clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getKey());
        Assertions.assertEquals("rack", this.clientTelemetryReporter.telemetryProvider().resource().getAttributes(0).getValue().getStringValue());
    }

    @Test
    public void testInitTelemetryReporterNoCollector() {
        MetricsContext metricsContext = Collections::emptyMap;
        this.clientTelemetryReporter.configure(this.configs);
        this.clientTelemetryReporter.contextChange(metricsContext);
        Assertions.assertNull(this.clientTelemetryReporter.metricsCollector());
    }

    @Test
    public void testProducerLabels() {
        this.configs.put("client.id", "test-client");
        this.configs.put("group.id", "group-id");
        this.configs.put("group.instance.id", "group-instance-id");
        this.configs.put("transactional.id", "transaction-id");
        this.configs.put("client.rack", "rack");
        this.clientTelemetryReporter.configure(this.configs);
        this.clientTelemetryReporter.contextChange(new KafkaMetricsContext("kafka.producer"));
        Assertions.assertNotNull(this.clientTelemetryReporter.metricsCollector());
        Assertions.assertNotNull(this.clientTelemetryReporter.telemetryProvider().resource());
        List attributesList = this.clientTelemetryReporter.telemetryProvider().resource().getAttributesList();
        Assertions.assertEquals(2, attributesList.size());
        attributesList.forEach(keyValue -> {
            if (keyValue.getKey().equals("client_rack")) {
                Assertions.assertEquals("rack", keyValue.getValue().getStringValue());
            } else if (keyValue.getKey().equals("transactional_id")) {
                Assertions.assertEquals("transaction-id", keyValue.getValue().getStringValue());
            }
        });
    }

    @Test
    public void testConsumerLabels() {
        this.configs.put("client.id", "test-client");
        this.configs.put("group.id", "group-id");
        this.configs.put("group.instance.id", "group-instance-id");
        this.configs.put("transactional.id", "transaction-id");
        this.configs.put("client.rack", "rack");
        this.clientTelemetryReporter.configure(this.configs);
        this.clientTelemetryReporter.contextChange(new KafkaMetricsContext("kafka.consumer"));
        Assertions.assertNotNull(this.clientTelemetryReporter.metricsCollector());
        Assertions.assertNotNull(this.clientTelemetryReporter.telemetryProvider().resource());
        List attributesList = this.clientTelemetryReporter.telemetryProvider().resource().getAttributesList();
        Assertions.assertEquals(3, attributesList.size());
        attributesList.forEach(keyValue -> {
            if (keyValue.getKey().equals("client_rack")) {
                Assertions.assertEquals("rack", keyValue.getValue().getStringValue());
            } else if (keyValue.getKey().equals("group_id")) {
                Assertions.assertEquals("group-id", keyValue.getValue().getStringValue());
            } else if (keyValue.getKey().equals("group_instance_id")) {
                Assertions.assertEquals("group-instance-id", keyValue.getValue().getStringValue());
            }
        });
    }

    @Test
    public void testTelemetryReporterClose() {
        this.clientTelemetryReporter.close();
        Assertions.assertEquals(ClientTelemetryState.TERMINATED, this.clientTelemetryReporter.telemetrySender().state());
    }

    @Test
    public void testTelemetryReporterCloseMultipleTimesNoException() {
        this.clientTelemetryReporter.close();
        this.clientTelemetryReporter.close();
        Assertions.assertEquals(ClientTelemetryState.TERMINATED, this.clientTelemetryReporter.telemetrySender().state());
    }

    @Test
    public void testUpdateMetricsLabels() {
        this.clientTelemetryReporter.configure(this.configs);
        this.clientTelemetryReporter.contextChange(this.metricsContext);
        Assertions.assertTrue(this.clientTelemetryReporter.telemetryProvider().resource().getAttributesList().isEmpty());
        this.clientTelemetryReporter.updateMetricsLabels(Collections.singletonMap("key1", "value1"));
        Assertions.assertEquals(1, this.clientTelemetryReporter.telemetryProvider().resource().getAttributesList().size());
        Assertions.assertEquals("key1", ((KeyValue) this.clientTelemetryReporter.telemetryProvider().resource().getAttributesList().get(0)).getKey());
        Assertions.assertEquals("value1", ((KeyValue) this.clientTelemetryReporter.telemetryProvider().resource().getAttributesList().get(0)).getValue().getStringValue());
        this.clientTelemetryReporter.updateMetricsLabels(Collections.singletonMap("key2", "value2"));
        Assertions.assertEquals(2, this.clientTelemetryReporter.telemetryProvider().resource().getAttributesList().size());
        this.clientTelemetryReporter.telemetryProvider().resource().getAttributesList().forEach(keyValue -> {
            if (keyValue.getKey().equals("key1")) {
                Assertions.assertEquals("value1", keyValue.getValue().getStringValue());
            } else {
                Assertions.assertEquals("key2", keyValue.getKey());
                Assertions.assertEquals("value2", keyValue.getValue().getStringValue());
            }
        });
        this.clientTelemetryReporter.updateMetricsLabels(Collections.singletonMap("key2", "valueUpdated"));
        Assertions.assertEquals(2, this.clientTelemetryReporter.telemetryProvider().resource().getAttributesList().size());
        this.clientTelemetryReporter.telemetryProvider().resource().getAttributesList().forEach(keyValue2 -> {
            if (keyValue2.getKey().equals("key1")) {
                Assertions.assertEquals("value1", keyValue2.getValue().getStringValue());
            } else {
                Assertions.assertEquals("key2", keyValue2.getKey());
                Assertions.assertEquals("valueUpdated", keyValue2.getValue().getStringValue());
            }
        });
    }

    @Test
    public void testTelemetrySenderTimeToNextUpdate() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = this.clientTelemetryReporter.telemetrySender();
        Assertions.assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
        Assertions.assertEquals(0L, telemetrySender.timeToNextUpdate(100L));
        telemetrySender.updateSubscriptionResult(this.subscription, this.time.milliseconds());
        Assertions.assertEquals(20000.0f, (float) telemetrySender.timeToNextUpdate(100L), 200.0f);
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Assertions.assertEquals(100L, telemetrySender.timeToNextUpdate(100L));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        long timeToNextUpdate = telemetrySender.timeToNextUpdate(100L);
        Assertions.assertTrue(timeToNextUpdate > 0 && ((double) timeToNextUpdate) >= 0.5d * ((double) timeToNextUpdate) && ((double) timeToNextUpdate) <= 1.5d * ((double) timeToNextUpdate));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
        Assertions.assertEquals(100L, telemetrySender.timeToNextUpdate(100L));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_NEEDED));
        Assertions.assertEquals(0L, telemetrySender.timeToNextUpdate(100L));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS));
        Assertions.assertEquals(Long.MAX_VALUE, telemetrySender.timeToNextUpdate(100L));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATED));
        Assertions.assertThrows(IllegalStateException.class, () -> {
            telemetrySender.timeToNextUpdate(100L);
        });
    }

    @Test
    public void testCreateRequestSubscriptionNeeded() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = this.clientTelemetryReporter.telemetrySender();
        Assertions.assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
        Optional createRequest = telemetrySender.createRequest();
        Assertions.assertNotNull(createRequest);
        Assertions.assertTrue(createRequest.isPresent());
        Assertions.assertTrue(((AbstractRequest.Builder) createRequest.get()).build() instanceof GetTelemetrySubscriptionsRequest);
        Assertions.assertEquals(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData().setClientInstanceId(Uuid.ZERO_UUID), true).build().data(), ((AbstractRequest.Builder) createRequest.get()).build().data());
        Assertions.assertEquals(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS, telemetrySender.state());
    }

    @Test
    public void testCreateRequestSubscriptionNeededAfterExistingSubscription() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = this.clientTelemetryReporter.telemetrySender();
        telemetrySender.updateSubscriptionResult(this.subscription, this.time.milliseconds());
        Assertions.assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
        Optional createRequest = telemetrySender.createRequest();
        Assertions.assertNotNull(createRequest);
        Assertions.assertTrue(createRequest.isPresent());
        Assertions.assertTrue(((AbstractRequest.Builder) createRequest.get()).build() instanceof GetTelemetrySubscriptionsRequest);
        Assertions.assertEquals(new GetTelemetrySubscriptionsRequest.Builder(new GetTelemetrySubscriptionsRequestData().setClientInstanceId(this.subscription.clientInstanceId()), true).build().data(), ((AbstractRequest.Builder) createRequest.get()).build().data());
        Assertions.assertEquals(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS, telemetrySender.state());
    }

    @Test
    public void testCreateRequestPushNeeded() {
        this.clientTelemetryReporter.configure(this.configs);
        this.clientTelemetryReporter.contextChange(this.metricsContext);
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = this.clientTelemetryReporter.telemetrySender();
        telemetrySender.updateSubscriptionResult(this.subscription, this.time.milliseconds());
        telemetrySender.createRequest();
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Optional createRequest = telemetrySender.createRequest();
        Assertions.assertNotNull(createRequest);
        Assertions.assertTrue(createRequest.isPresent());
        Assertions.assertTrue(((AbstractRequest.Builder) createRequest.get()).build() instanceof PushTelemetryRequest);
        Assertions.assertEquals(new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setClientInstanceId(this.subscription.clientInstanceId()).setSubscriptionId(this.subscription.subscriptionId()), true).build().data(), ((AbstractRequest.Builder) createRequest.get()).build().data());
        Assertions.assertEquals(ClientTelemetryState.PUSH_IN_PROGRESS, telemetrySender.state());
    }

    @Test
    public void testCreateRequestPushNeededWithoutSubscription() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = this.clientTelemetryReporter.telemetrySender();
        telemetrySender.createRequest();
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Optional createRequest = telemetrySender.createRequest();
        Assertions.assertNotNull(createRequest);
        Assertions.assertFalse(createRequest.isPresent());
        Assertions.assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
    }

    @Test
    public void testCreateRequestInvalidState() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = this.clientTelemetryReporter.telemetrySender();
        telemetrySender.updateSubscriptionResult(this.subscription, this.time.milliseconds());
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Assertions.assertFalse(telemetrySender.createRequest().isPresent());
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
        Assertions.assertFalse(telemetrySender.createRequest().isPresent());
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_NEEDED));
        Assertions.assertFalse(telemetrySender.createRequest().isPresent());
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATING_PUSH_IN_PROGRESS));
        Assertions.assertFalse(telemetrySender.createRequest().isPresent());
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.TERMINATED));
        Assertions.assertFalse(telemetrySender.createRequest().isPresent());
    }

    @Test
    public void testCreateRequestPushNoCollector() {
        long milliseconds = this.time.milliseconds();
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = this.clientTelemetryReporter.telemetrySender();
        telemetrySender.createRequest();
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        telemetrySender.updateSubscriptionResult(this.subscription, milliseconds);
        long timeToNextUpdate = telemetrySender.timeToNextUpdate(100L);
        Assertions.assertTrue(timeToNextUpdate > 0 && timeToNextUpdate != 2000 && ((double) timeToNextUpdate) >= 0.5d * ((double) timeToNextUpdate) && ((double) timeToNextUpdate) <= 1.5d * ((double) timeToNextUpdate));
        this.time.sleep(1000L);
        Assertions.assertFalse(telemetrySender.createRequest().isPresent());
        Assertions.assertEquals(20000L, telemetrySender.timeToNextUpdate(100L));
        Assertions.assertEquals(milliseconds + 1000, telemetrySender.lastRequestMs());
    }

    @Test
    public void testHandleResponseGetSubscriptions() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = this.clientTelemetryReporter.telemetrySender();
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Uuid randomUuid = Uuid.randomUuid();
        telemetrySender.handleResponse(new GetTelemetrySubscriptionsResponse(new GetTelemetrySubscriptionsResponseData().setClientInstanceId(randomUuid).setSubscriptionId(5678).setAcceptedCompressionTypes(Collections.singletonList(Byte.valueOf(CompressionType.GZIP.id))).setPushIntervalMs(20000).setRequestedMetrics(Collections.singletonList("*"))));
        Assertions.assertEquals(ClientTelemetryState.PUSH_NEEDED, telemetrySender.state());
        ClientTelemetryReporter.ClientTelemetrySubscription subscription = telemetrySender.subscription();
        Assertions.assertNotNull(subscription);
        Assertions.assertEquals(randomUuid, subscription.clientInstanceId());
        Assertions.assertEquals(5678, subscription.subscriptionId());
        Assertions.assertEquals(Collections.singletonList(CompressionType.GZIP), subscription.acceptedCompressionTypes());
        Assertions.assertEquals(20000, subscription.pushIntervalMs());
        Assertions.assertEquals(ClientTelemetryUtils.SELECTOR_ALL_METRICS, subscription.selector());
    }

    @Test
    public void testHandleResponseGetSubscriptionsWithoutMetrics() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = this.clientTelemetryReporter.telemetrySender();
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Uuid randomUuid = Uuid.randomUuid();
        telemetrySender.handleResponse(new GetTelemetrySubscriptionsResponse(new GetTelemetrySubscriptionsResponseData().setClientInstanceId(randomUuid).setSubscriptionId(5678).setAcceptedCompressionTypes(Collections.singletonList(Byte.valueOf(CompressionType.GZIP.id))).setPushIntervalMs(20000)));
        Assertions.assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
        ClientTelemetryReporter.ClientTelemetrySubscription subscription = telemetrySender.subscription();
        Assertions.assertNotNull(subscription);
        Assertions.assertEquals(randomUuid, subscription.clientInstanceId());
        Assertions.assertEquals(5678, subscription.subscriptionId());
        Assertions.assertEquals(Collections.singletonList(CompressionType.GZIP), subscription.acceptedCompressionTypes());
        Assertions.assertEquals(20000, subscription.pushIntervalMs());
        Assertions.assertEquals(ClientTelemetryUtils.SELECTOR_NO_METRICS, subscription.selector());
    }

    @Test
    public void testHandleResponseGetTelemetryErrorResponse() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = this.clientTelemetryReporter.telemetrySender();
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        telemetrySender.handleResponse(new GetTelemetrySubscriptionsResponse(new GetTelemetrySubscriptionsResponseData().setErrorCode(Errors.THROTTLING_QUOTA_EXCEEDED.code())));
        Assertions.assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
        Assertions.assertEquals(300000L, telemetrySender.intervalMs());
        Assertions.assertTrue(telemetrySender.enabled());
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        telemetrySender.handleResponse(new GetTelemetrySubscriptionsResponse(new GetTelemetrySubscriptionsResponseData().setErrorCode(Errors.INVALID_REQUEST.code())));
        Assertions.assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
        Assertions.assertEquals(2147483647L, telemetrySender.intervalMs());
        Assertions.assertFalse(telemetrySender.enabled());
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        telemetrySender.enabled(true);
        telemetrySender.handleResponse(new GetTelemetrySubscriptionsResponse(new GetTelemetrySubscriptionsResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code())));
        Assertions.assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
        Assertions.assertEquals(2147483647L, telemetrySender.intervalMs());
        Assertions.assertFalse(telemetrySender.enabled());
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        telemetrySender.enabled(true);
        telemetrySender.handleResponse(new GetTelemetrySubscriptionsResponse(new GetTelemetrySubscriptionsResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())));
        Assertions.assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
        Assertions.assertEquals(2147483647L, telemetrySender.intervalMs());
        Assertions.assertFalse(telemetrySender.enabled());
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
    }

    @Test
    public void testHandleResponseSubscriptionChange() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = this.clientTelemetryReporter.telemetrySender();
        telemetrySender.updateSubscriptionResult(this.subscription, this.time.milliseconds());
        KafkaMetricsCollector kafkaMetricsCollector = (KafkaMetricsCollector) Mockito.mock(KafkaMetricsCollector.class);
        this.clientTelemetryReporter.metricsCollector(kafkaMetricsCollector);
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Uuid randomUuid = Uuid.randomUuid();
        telemetrySender.handleResponse(new GetTelemetrySubscriptionsResponse(new GetTelemetrySubscriptionsResponseData().setClientInstanceId(randomUuid).setSubscriptionId(15678).setAcceptedCompressionTypes(Collections.singletonList(Byte.valueOf(CompressionType.ZSTD.id))).setPushIntervalMs(10000).setDeltaTemporality(false).setRequestedMetrics(Collections.singletonList("org.apache.kafka.producer"))));
        Assertions.assertEquals(ClientTelemetryState.PUSH_NEEDED, telemetrySender.state());
        ClientTelemetryReporter.ClientTelemetrySubscription subscription = telemetrySender.subscription();
        Assertions.assertNotNull(subscription);
        Assertions.assertEquals(randomUuid, subscription.clientInstanceId());
        Assertions.assertEquals(15678, subscription.subscriptionId());
        Assertions.assertEquals(Collections.singletonList(CompressionType.ZSTD), subscription.acceptedCompressionTypes());
        Assertions.assertEquals(10000, subscription.pushIntervalMs());
        Assertions.assertFalse(subscription.deltaTemporality());
        Assertions.assertTrue(subscription.selector().test(new MetricKey("org.apache.kafka.producer")));
        Assertions.assertTrue(subscription.selector().test(new MetricKey("org.apache.kafka.producerabc")));
        Assertions.assertTrue(subscription.selector().test(new MetricKey("org.apache.kafka.producer.abc")));
        Assertions.assertFalse(subscription.selector().test(new MetricKey("org.apache.kafka.produce")));
        ((KafkaMetricsCollector) Mockito.verify(kafkaMetricsCollector, Mockito.times(1))).metricsReset();
    }

    @Test
    public void testHandleResponsePushTelemetry() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = this.clientTelemetryReporter.telemetrySender();
        telemetrySender.updateSubscriptionResult(this.subscription, this.time.milliseconds());
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
        telemetrySender.handleResponse(new PushTelemetryResponse(new PushTelemetryResponseData()));
        Assertions.assertEquals(ClientTelemetryState.PUSH_NEEDED, telemetrySender.state());
        Assertions.assertEquals(this.subscription.pushIntervalMs(), telemetrySender.intervalMs());
        Assertions.assertTrue(telemetrySender.enabled());
    }

    @Test
    public void testHandleResponsePushTelemetryErrorResponse() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = this.clientTelemetryReporter.telemetrySender();
        telemetrySender.updateSubscriptionResult(this.subscription, this.time.milliseconds());
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
        telemetrySender.handleResponse(new PushTelemetryResponse(new PushTelemetryResponseData().setErrorCode(Errors.UNKNOWN_SUBSCRIPTION_ID.code())));
        Assertions.assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
        Assertions.assertEquals(0L, telemetrySender.intervalMs());
        Assertions.assertTrue(telemetrySender.enabled());
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
        telemetrySender.handleResponse(new PushTelemetryResponse(new PushTelemetryResponseData().setErrorCode(Errors.UNSUPPORTED_COMPRESSION_TYPE.code())));
        Assertions.assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
        Assertions.assertEquals(0L, telemetrySender.intervalMs());
        Assertions.assertTrue(telemetrySender.enabled());
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
        telemetrySender.handleResponse(new PushTelemetryResponse(new PushTelemetryResponseData().setErrorCode(Errors.TELEMETRY_TOO_LARGE.code())));
        Assertions.assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
        Assertions.assertEquals(20000L, telemetrySender.intervalMs());
        Assertions.assertTrue(telemetrySender.enabled());
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
        telemetrySender.handleResponse(new PushTelemetryResponse(new PushTelemetryResponseData().setErrorCode(Errors.THROTTLING_QUOTA_EXCEEDED.code())));
        Assertions.assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
        Assertions.assertEquals(20000L, telemetrySender.intervalMs());
        Assertions.assertTrue(telemetrySender.enabled());
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
        telemetrySender.handleResponse(new PushTelemetryResponse(new PushTelemetryResponseData().setErrorCode(Errors.INVALID_REQUEST.code())));
        Assertions.assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
        Assertions.assertEquals(2147483647L, telemetrySender.intervalMs());
        Assertions.assertFalse(telemetrySender.enabled());
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
        telemetrySender.enabled(true);
        telemetrySender.handleResponse(new PushTelemetryResponse(new PushTelemetryResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code())));
        Assertions.assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
        Assertions.assertEquals(2147483647L, telemetrySender.intervalMs());
        Assertions.assertFalse(telemetrySender.enabled());
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
        telemetrySender.enabled(true);
        telemetrySender.handleResponse(new PushTelemetryResponse(new PushTelemetryResponseData().setErrorCode(Errors.INVALID_RECORD.code())));
        Assertions.assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
        Assertions.assertEquals(2147483647L, telemetrySender.intervalMs());
        Assertions.assertFalse(telemetrySender.enabled());
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_NEEDED));
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.PUSH_IN_PROGRESS));
        telemetrySender.enabled(true);
        telemetrySender.handleResponse(new PushTelemetryResponse(new PushTelemetryResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())));
        Assertions.assertEquals(ClientTelemetryState.SUBSCRIPTION_NEEDED, telemetrySender.state());
        Assertions.assertEquals(2147483647L, telemetrySender.intervalMs());
        Assertions.assertFalse(telemetrySender.enabled());
    }

    @Test
    public void testClientInstanceId() throws InterruptedException {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = this.clientTelemetryReporter.telemetrySender();
        Assertions.assertTrue(telemetrySender.maybeSetState(ClientTelemetryState.SUBSCRIPTION_IN_PROGRESS));
        CountDownLatch countDownLatch = new CountDownLatch(2);
        AtomicReference atomicReference = new AtomicReference();
        new Thread(() -> {
            try {
                atomicReference.set(telemetrySender.clientInstanceId(Duration.ofMillis(10000L)));
            } finally {
                countDownLatch.countDown();
            }
        }).start();
        new Thread(() -> {
            try {
                telemetrySender.updateSubscriptionResult(this.subscription, this.time.milliseconds());
            } finally {
                countDownLatch.countDown();
            }
        }).start();
        Assertions.assertTrue(countDownLatch.await(2000L, TimeUnit.MILLISECONDS));
        Assertions.assertNotNull(atomicReference.get());
        Assertions.assertTrue(((Optional) atomicReference.get()).isPresent());
        Assertions.assertEquals(this.uuid, ((Optional) atomicReference.get()).get());
    }

    @Test
    public void testComputeStaggeredIntervalMs() {
        ClientTelemetryReporter.DefaultClientTelemetrySender telemetrySender = this.clientTelemetryReporter.telemetrySender();
        Assertions.assertEquals(0, telemetrySender.computeStaggeredIntervalMs(0, 0.5d, 1.5d));
        Assertions.assertEquals(1, telemetrySender.computeStaggeredIntervalMs(1, 0.99d, 1.0d));
        long computeStaggeredIntervalMs = telemetrySender.computeStaggeredIntervalMs(1000, 0.5d, 1.5d);
        Assertions.assertTrue(computeStaggeredIntervalMs >= 500 && computeStaggeredIntervalMs <= 1500);
    }

    @AfterEach
    public void tearDown() {
        this.clientTelemetryReporter.close();
    }
}
