package zipkin.kafka;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import zipkin.async.AsyncSpanConsumer;

/* loaded from: input_file:lib/transport-kafka-0.10.1.jar:zipkin/kafka/KafkaTransport.class */
public final class KafkaTransport implements AutoCloseable {
    final ConsumerConnector connector;
    final ExecutorService pool;

    public KafkaTransport(KafkaConfig kafkaConfig, AsyncSpanConsumer asyncSpanConsumer) {
        this.pool = kafkaConfig.streams == 1 ? Executors.newSingleThreadExecutor() : Executors.newFixedThreadPool(kafkaConfig.streams);
        this.connector = Consumer.createJavaConsumerConnector(kafkaConfig.forConsumer());
        LinkedHashMap linkedHashMap = new LinkedHashMap(1);
        linkedHashMap.put(kafkaConfig.topic, Integer.valueOf(kafkaConfig.streams));
        ((List) this.connector.createMessageStreams(linkedHashMap, new StringDecoder(null), new SpansDecoder()).get(kafkaConfig.topic)).forEach(kafkaStream -> {
            this.pool.execute(new KafkaStreamProcessor(kafkaStream, asyncSpanConsumer));
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.pool.shutdown();
        this.connector.shutdown();
    }
}
