package zipkin.kafka;

import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import zipkin.Span;
import zipkin.async.AsyncSpanConsumer;

/* loaded from: input_file:lib/transport-kafka-0.10.4.jar:zipkin/kafka/KafkaStreamProcessor.class */
final class KafkaStreamProcessor implements Runnable {
    final Logger logger = Logger.getLogger(KafkaStreamProcessor.class.getName());
    final KafkaStream<String, List<Span>> stream;
    final AsyncSpanConsumer spanConsumer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaStreamProcessor(KafkaStream<String, List<Span>> kafkaStream, AsyncSpanConsumer asyncSpanConsumer) {
        this.stream = kafkaStream;
        this.spanConsumer = asyncSpanConsumer;
    }

    @Override // java.lang.Runnable
    public void run() {
        ConsumerIterator<String, List<Span>> it = this.stream.iterator();
        while (it.hasNext()) {
            List<Span> message = it.next().message();
            if (!message.isEmpty()) {
                try {
                    this.spanConsumer.accept(message, AsyncSpanConsumer.NOOP_CALLBACK);
                } catch (RuntimeException e) {
                    StringBuilder sb = new StringBuilder("unhandled error processing traceId -> spanId: ");
                    Iterator<Span> it2 = message.iterator();
                    while (it2.hasNext()) {
                        Span next = it2.next();
                        sb.append(next.traceId).append(" -> ").append(next.id);
                        if (it2.hasNext()) {
                            sb.append(",");
                        }
                    }
                    this.logger.log(Level.WARNING, sb.toString(), (Throwable) e);
                }
            }
        }
    }
}
