package zipkin.collector.kafka10;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin.Codec;
import zipkin.collector.Collector;
import zipkin.collector.CollectorMetrics;
import zipkin.collector.kafka10.KafkaCollector;
import zipkin.storage.Callback;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:zipkin/collector/kafka10/KafkaCollectorWorker.class */
public final class KafkaCollectorWorker implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaCollectorWorker.class);
    final Consumer<byte[], byte[]> kafkaConsumer;
    final Collector collector;
    final CollectorMetrics metrics;
    final AtomicReference<List<TopicPartition>> assignedPartitions = new AtomicReference<>(Collections.emptyList());

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaCollectorWorker(KafkaCollector.Builder builder) {
        this.kafkaConsumer = new KafkaConsumer(builder.properties);
        this.kafkaConsumer.subscribe(Arrays.asList(builder.topic.split(",")), new ConsumerRebalanceListener() { // from class: zipkin.collector.kafka10.KafkaCollectorWorker.1
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                KafkaCollectorWorker.this.assignedPartitions.set(Collections.emptyList());
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                KafkaCollectorWorker.this.assignedPartitions.set(Collections.unmodifiableList(new ArrayList(collection)));
            }
        });
        this.collector = builder.delegate.build();
        this.metrics = builder.metrics;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            LOG.info("Kafka consumer starting polling loop.");
            while (true) {
                ConsumerRecords poll = this.kafkaConsumer.poll(1000L);
                LOG.debug("Kafka polling returned batch of {} messages.", Integer.valueOf(poll.count()));
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    this.metrics.incrementMessages();
                    byte[] bArr = (byte[]) consumerRecord.value();
                    if (bArr.length == 0) {
                        this.metrics.incrementMessagesDropped();
                    } else if (bArr[0] == 91) {
                        this.collector.acceptSpans(bArr, Codec.JSON, Callback.NOOP);
                    } else if (bArr[0] == 12) {
                        this.collector.acceptSpans(bArr, Codec.THRIFT, Callback.NOOP);
                    } else {
                        this.collector.acceptSpans(Collections.singletonList(bArr), Codec.THRIFT, Callback.NOOP);
                    }
                }
            }
        } catch (Throwable th) {
            LOG.info("Kafka consumer polling loop stopped.");
            LOG.info("Closing Kafka consumer...");
            this.kafkaConsumer.close();
            LOG.info("Kafka consumer closed.");
            throw th;
        }
    }
}
