package zipkin.kafka;

import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ZookeeperConsumerConnector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import zipkin.AsyncSpanConsumer;
import zipkin.CollectorMetrics;
import zipkin.CollectorSampler;
import zipkin.StorageComponent;
import zipkin.internal.Lazy;
import zipkin.internal.Util;

/* loaded from: input_file:lib/zipkin-transport-kafka-0.18.5.jar:zipkin/kafka/KafkaCollector.class */
public final class KafkaCollector implements AutoCloseable {
    final ZookeeperConsumerConnector connector;
    final ExecutorService pool;

    /* loaded from: input_file:lib/zipkin-transport-kafka-0.18.5.jar:zipkin/kafka/KafkaCollector$Builder.class */
    public static final class Builder {
        String zookeeper;
        CollectorSampler sampler = CollectorSampler.ALWAYS_SAMPLE;
        CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS;
        String topic = "zipkin";
        String groupId = "zipkin";
        int streams = 1;
        int maxMessageSize = 1048576;

        public Builder sampler(CollectorSampler collectorSampler) {
            this.sampler = (CollectorSampler) Util.checkNotNull(collectorSampler, "sampler");
            return this;
        }

        public Builder metrics(CollectorMetrics collectorMetrics) {
            this.metrics = ((CollectorMetrics) Util.checkNotNull(collectorMetrics, "metrics")).forTransport("kafka");
            return this;
        }

        public Builder topic(String str) {
            this.topic = (String) Util.checkNotNull(str, "topic");
            return this;
        }

        public Builder zookeeper(String str) {
            this.zookeeper = (String) Util.checkNotNull(str, "zookeeper");
            return this;
        }

        public Builder groupId(String str) {
            this.groupId = (String) Util.checkNotNull(str, "groupId");
            return this;
        }

        public Builder streams(int i) {
            this.streams = i;
            return this;
        }

        public Builder maxMessageSize(int i) {
            this.maxMessageSize = i;
            return this;
        }

        public KafkaCollector build(final StorageComponent storageComponent) {
            Util.checkNotNull(storageComponent, "storage");
            return new KafkaCollector(this, new Lazy<AsyncSpanConsumer>() { // from class: zipkin.kafka.KafkaCollector.Builder.1
                /* JADX INFO: Access modifiers changed from: protected */
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // zipkin.internal.Lazy
                public AsyncSpanConsumer compute() {
                    return (AsyncSpanConsumer) Util.checkNotNull(storageComponent.asyncSpanConsumer(Builder.this.sampler, Builder.this.metrics), storageComponent + ".asyncSpanConsumer()");
                }
            });
        }

        Builder() {
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    KafkaCollector(Builder builder, Lazy<AsyncSpanConsumer> lazy) {
        LinkedHashMap linkedHashMap = new LinkedHashMap(1);
        linkedHashMap.put(builder.topic, Integer.valueOf(builder.streams));
        Properties properties = new Properties();
        properties.put("zookeeper.connect", builder.zookeeper);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, builder.groupId);
        properties.put("fetch.message.max.bytes", String.valueOf(builder.maxMessageSize));
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest");
        this.connector = (ZookeeperConsumerConnector) Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(properties));
        this.pool = builder.streams == 1 ? Executors.newSingleThreadExecutor() : Executors.newFixedThreadPool(builder.streams);
        Iterator<KafkaStream<byte[], byte[]>> it = this.connector.createMessageStreams(linkedHashMap).get(builder.topic).iterator();
        while (it.hasNext()) {
            this.pool.execute(new KafkaStreamProcessor(it.next(), lazy, builder.metrics));
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.pool.shutdown();
        this.connector.shutdown();
    }
}
