package zipkin.kafka;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import zipkin.AsyncSpanConsumer;
import zipkin.Sampler;
import zipkin.StorageComponent;
import zipkin.internal.Lazy;
import zipkin.internal.Util;

/* loaded from: input_file:zipkin/kafka/KafkaCollector.class */
public final class KafkaCollector implements AutoCloseable {
    final ConsumerConnector connector;
    final ExecutorService pool;

    /* loaded from: input_file:zipkin/kafka/KafkaCollector$Builder.class */
    public static final class Builder {
        String zookeeper;
        String topic = "zipkin";
        String groupId = "zipkin";
        int streams = 1;

        public Builder topic(String str) {
            this.topic = (String) Util.checkNotNull(str, "topic");
            return this;
        }

        public Builder zookeeper(String str) {
            this.zookeeper = (String) Util.checkNotNull(str, "zookeeper");
            return this;
        }

        public Builder groupId(String str) {
            this.groupId = (String) Util.checkNotNull(str, "groupId");
            return this;
        }

        public Builder streams(int i) {
            this.streams = i;
            return this;
        }

        public KafkaCollector writeTo(final StorageComponent storageComponent, final Sampler sampler) {
            Util.checkNotNull(storageComponent, "storage");
            Util.checkNotNull(sampler, "sampler");
            return new KafkaCollector(this, new Lazy<AsyncSpanConsumer>() { // from class: zipkin.kafka.KafkaCollector.Builder.1
                /* JADX INFO: Access modifiers changed from: protected */
                /* renamed from: compute, reason: merged with bridge method [inline-methods] */
                public AsyncSpanConsumer m0compute() {
                    return (AsyncSpanConsumer) Util.checkNotNull(storageComponent.asyncSpanConsumer(sampler), storageComponent + ".asyncSpanConsumer()");
                }
            });
        }
    }

    KafkaCollector(Builder builder, Lazy<AsyncSpanConsumer> lazy) {
        LinkedHashMap linkedHashMap = new LinkedHashMap(1);
        linkedHashMap.put(builder.topic, Integer.valueOf(builder.streams));
        Properties properties = new Properties();
        properties.put("zookeeper.connect", builder.zookeeper);
        properties.put("group.id", builder.groupId);
        properties.put("auto.offset.reset", "smallest");
        this.connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
        this.pool = builder.streams == 1 ? Executors.newSingleThreadExecutor() : Executors.newFixedThreadPool(builder.streams);
        ((List) this.connector.createMessageStreams(linkedHashMap, new StringDecoder((VerifiableProperties) null), new SpansDecoder()).get(builder.topic)).forEach(kafkaStream -> {
            this.pool.execute(new KafkaStreamProcessor(kafkaStream, lazy));
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.pool.shutdown();
        this.connector.shutdown();
    }
}
