package zipkin.storage.cassandra;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ContiguousSet;
import com.google.common.collect.DiscreteDomain;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.common.collect.Ordering;
import com.google.common.collect.Range;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.nio.charset.CharacterCodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.servlet.tags.form.ErrorsTag;
import zipkin.Codec;
import zipkin.DependencyLink;
import zipkin.Span;
import zipkin.internal.CorrectForClockSkew;
import zipkin.internal.Dependencies;
import zipkin.internal.DependencyLinker;
import zipkin.internal.GroupByTraceId;
import zipkin.internal.MergeById;
import zipkin.internal.Nullable;
import zipkin.internal.Util;
import zipkin.storage.QueryRequest;
import zipkin.storage.guava.GuavaSpanStore;

/* loaded from: input_file:BOOT-INF/lib/io.zipkin.java-zipkin-storage-cassandra-2.4.5.jar:zipkin/storage/cassandra/CassandraSpanStore.class */
public final class CassandraSpanStore implements GuavaSpanStore {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) CassandraSpanStore.class);
    static final ListenableFuture<List<String>> EMPTY_LIST = Futures.immediateFuture(Collections.emptyList());
    private final int maxTraceCols;
    private final int indexFetchMultiplier;
    private final boolean strictTraceId;
    private final Session session;
    private final TimestampCodec timestampCodec;
    private final Set<Integer> buckets;
    private final PreparedStatement selectTraces;
    private final PreparedStatement selectDependencies;
    private final PreparedStatement selectServiceNames;
    private final PreparedStatement selectSpanNames;
    private final PreparedStatement selectTraceIdsByServiceName;
    private final PreparedStatement selectTraceIdsByServiceNames;
    private final PreparedStatement selectTraceIdsBySpanName;
    private final PreparedStatement selectTraceIdsByAnnotation;
    private final Function<ResultSet, Map<Long, Long>> traceIdToTimestamp;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: zipkin.storage.cassandra.CassandraSpanStore$3, reason: invalid class name */
    /* loaded from: input_file:BOOT-INF/lib/io.zipkin.java-zipkin-storage-cassandra-2.4.5.jar:zipkin/storage/cassandra/CassandraSpanStore$3.class */
    public class AnonymousClass3 implements AsyncFunction<Set<Long>, List<List<Span>>> {
        final /* synthetic */ QueryRequest val$request;

        AnonymousClass3(QueryRequest queryRequest) {
            this.val$request = queryRequest;
        }

        @Override // com.google.common.util.concurrent.AsyncFunction
        public ListenableFuture<List<List<Span>>> apply(@Nullable Set<Long> set) {
            return Futures.transform(CassandraSpanStore.this.getSpansByTraceIds(ImmutableSet.copyOf(Iterators.limit(set.iterator(), this.val$request.limit)), CassandraSpanStore.this.maxTraceCols), new Function<List<Span>, List<List<Span>>>() { // from class: zipkin.storage.cassandra.CassandraSpanStore.3.1
                @Override // com.google.common.base.Function
                public List<List<Span>> apply(@Nullable List<Span> list) {
                    return FluentIterable.from(GroupByTraceId.apply(list, CassandraSpanStore.this.strictTraceId, true)).filter(new Predicate<List<Span>>() { // from class: zipkin.storage.cassandra.CassandraSpanStore.3.1.1
                        @Override // com.google.common.base.Predicate
                        public boolean apply(List<Span> list2) {
                            return list2.get(0).traceIdHigh == 0 || AnonymousClass3.this.val$request.test(list2);
                        }
                    }).toList();
                }
            });
        }

        public String toString() {
            return "getSpansByTraceIds";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/io.zipkin.java-zipkin-storage-cassandra-2.4.5.jar:zipkin/storage/cassandra/CassandraSpanStore$AdjustTrace.class */
    public enum AdjustTrace implements Function<Collection<Span>, List<Span>> {
        INSTANCE;

        @Override // com.google.common.base.Function
        public List<Span> apply(@Nullable Collection<Span> collection) {
            List<Span> apply = CorrectForClockSkew.apply(MergeById.apply(collection));
            if (apply.isEmpty()) {
                return null;
            }
            return apply;
        }

        @Override // java.lang.Enum
        public String toString() {
            return "AdjustTrace";
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/io.zipkin.java-zipkin-storage-cassandra-2.4.5.jar:zipkin/storage/cassandra/CassandraSpanStore$ConvertDependenciesResponse.class */
    enum ConvertDependenciesResponse implements Function<ResultSet, List<DependencyLink>> {
        INSTANCE;

        @Override // com.google.common.base.Function
        public List<DependencyLink> apply(@Nullable ResultSet resultSet) {
            ImmutableList.Builder builder = ImmutableList.builder();
            Iterator<Row> it = resultSet.iterator();
            while (it.hasNext()) {
                Iterator<DependencyLink> it2 = Dependencies.fromThrift(it.next().getBytes("dependencies")).links.iterator();
                while (it2.hasNext()) {
                    builder.add((ImmutableList.Builder) it2.next());
                }
            }
            return DependencyLinker.merge(builder.build());
        }

        @Override // java.lang.Enum
        public String toString() {
            return "MergeDependencies";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CassandraSpanStore(Session session, int i, int i2, int i3, boolean z) {
        this.session = session;
        this.maxTraceCols = i2;
        this.indexFetchMultiplier = i3;
        this.strictTraceId = z;
        ProtocolVersion protocolVersion = session.getCluster().getConfiguration().getProtocolOptions().getProtocolVersion();
        this.timestampCodec = new TimestampCodec(protocolVersion);
        this.buckets = ContiguousSet.create(Range.closedOpen(0, Integer.valueOf(i)), DiscreteDomain.integers());
        this.selectTraces = session.prepare(QueryBuilder.select("trace_id", ErrorsTag.SPAN_TAG).from("traces").where(QueryBuilder.in("trace_id", QueryBuilder.bindMarker("trace_id"))).limit(QueryBuilder.bindMarker("limit_")));
        this.selectDependencies = session.prepare(QueryBuilder.select("dependencies").from("dependencies").where(QueryBuilder.in("day", QueryBuilder.bindMarker("days"))));
        this.selectServiceNames = session.prepare(QueryBuilder.select("service_name").from("service_names"));
        this.selectSpanNames = session.prepare(QueryBuilder.select("span_name").from("span_names").where(QueryBuilder.eq("service_name", QueryBuilder.bindMarker("service_name"))).and(QueryBuilder.eq("bucket", QueryBuilder.bindMarker("bucket"))).limit(QueryBuilder.bindMarker("limit_")));
        this.selectTraceIdsByServiceName = session.prepare(QueryBuilder.select("ts", "trace_id").from("service_name_index").where(QueryBuilder.eq("service_name", QueryBuilder.bindMarker("service_name"))).and(QueryBuilder.in("bucket", QueryBuilder.bindMarker("bucket"))).and(QueryBuilder.gte("ts", QueryBuilder.bindMarker("start_ts"))).and(QueryBuilder.lte("ts", QueryBuilder.bindMarker("end_ts"))).limit(QueryBuilder.bindMarker("limit_")).orderBy(QueryBuilder.desc("ts")));
        this.selectTraceIdsBySpanName = session.prepare(QueryBuilder.select("ts", "trace_id").from("service_span_name_index").where(QueryBuilder.eq("service_span_name", QueryBuilder.bindMarker("service_span_name"))).and(QueryBuilder.gte("ts", QueryBuilder.bindMarker("start_ts"))).and(QueryBuilder.lte("ts", QueryBuilder.bindMarker("end_ts"))).limit(QueryBuilder.bindMarker("limit_")).orderBy(QueryBuilder.desc("ts")));
        this.selectTraceIdsByAnnotation = session.prepare(QueryBuilder.select("ts", "trace_id").from("annotations_index").where(QueryBuilder.eq("annotation", QueryBuilder.bindMarker("annotation"))).and(QueryBuilder.in("bucket", QueryBuilder.bindMarker("bucket"))).and(QueryBuilder.gte("ts", QueryBuilder.bindMarker("start_ts"))).and(QueryBuilder.lte("ts", QueryBuilder.bindMarker("end_ts"))).limit(QueryBuilder.bindMarker("limit_")).orderBy(QueryBuilder.desc("ts")));
        if (protocolVersion.compareTo(ProtocolVersion.V4) < 0) {
            LOG.warn("Please update Cassandra to 2.2 or later, as some features may fail");
            this.selectTraceIdsByServiceNames = null;
        } else {
            this.selectTraceIdsByServiceNames = session.prepare(QueryBuilder.select("ts", "trace_id").from("service_name_index").where(QueryBuilder.in("service_name", QueryBuilder.bindMarker("service_name"))).and(QueryBuilder.in("bucket", QueryBuilder.bindMarker("bucket"))).and(QueryBuilder.gte("ts", QueryBuilder.bindMarker("start_ts"))).and(QueryBuilder.lte("ts", QueryBuilder.bindMarker("end_ts"))).limit(QueryBuilder.bindMarker("limit_")).orderBy(QueryBuilder.desc("ts")));
        }
        this.traceIdToTimestamp = new Function<ResultSet, Map<Long, Long>>() { // from class: zipkin.storage.cassandra.CassandraSpanStore.1
            @Override // com.google.common.base.Function
            public Map<Long, Long> apply(ResultSet resultSet) {
                LinkedHashMap linkedHashMap = new LinkedHashMap();
                for (Row row : resultSet) {
                    linkedHashMap.put(Long.valueOf(row.getLong("trace_id")), Long.valueOf(CassandraSpanStore.this.timestampCodec.deserialize(row, "ts")));
                }
                return linkedHashMap;
            }
        };
    }

    @Override // zipkin.storage.guava.GuavaSpanStore
    public ListenableFuture<List<List<Span>>> getTraces(final QueryRequest queryRequest) {
        ListenableFuture<Map<Long, Long>> transform;
        ListenableFuture transform2;
        Preconditions.checkArgument(queryRequest.minDuration == null, "getTraces with duration is unsupported. Upgrade to the new cassandra3 schema.");
        final int i = queryRequest.limit * this.indexFetchMultiplier;
        if (queryRequest.spanName != null) {
            transform = getTraceIdsBySpanName(queryRequest.serviceName, queryRequest.spanName, queryRequest.endTs * 1000, queryRequest.lookback * 1000, i);
        } else if (queryRequest.serviceName != null) {
            transform = getTraceIdsByServiceNames(Collections.singletonList(queryRequest.serviceName), queryRequest.endTs * 1000, queryRequest.lookback * 1000, i);
        } else {
            Preconditions.checkArgument(this.selectTraceIdsByServiceNames != null, "getTraces without serviceName requires Cassandra 2.2 or later");
            transform = Futures.transform(getServiceNames(), new AsyncFunction<List<String>, Map<Long, Long>>() { // from class: zipkin.storage.cassandra.CassandraSpanStore.2
                @Override // com.google.common.util.concurrent.AsyncFunction
                public ListenableFuture<Map<Long, Long>> apply(@Nullable List<String> list) {
                    return CassandraSpanStore.this.getTraceIdsByServiceNames(list, queryRequest.endTs * 1000, queryRequest.lookback * 1000, i);
                }
            });
        }
        List<String> annotationKeys = CassandraUtil.annotationKeys(queryRequest);
        if (annotationKeys.isEmpty()) {
            transform2 = Futures.transform(transform, CassandraUtil.keyset());
        } else {
            ArrayList arrayList = new ArrayList();
            if (queryRequest.spanName != null) {
                arrayList.add(transform);
            }
            Iterator<String> it = annotationKeys.iterator();
            while (it.hasNext()) {
                arrayList.add(getTraceIdsByAnnotation(it.next(), queryRequest.endTs * 1000, queryRequest.lookback * 1000, i));
            }
            transform2 = Futures.transform(Futures.allAsList(arrayList), CassandraUtil.intersectKeySets());
        }
        return Futures.transform(transform2, new AnonymousClass3(queryRequest));
    }

    @Override // zipkin.storage.guava.GuavaSpanStore
    public ListenableFuture<List<Span>> getRawTrace(long j) {
        return getRawTrace(0L, j);
    }

    @Override // zipkin.storage.guava.GuavaSpanStore
    public ListenableFuture<List<Span>> getRawTrace(final long j, long j2) {
        return Futures.transform(getSpansByTraceIds(Collections.singleton(Long.valueOf(j2)), this.maxTraceCols), new Function<List<Span>, List<Span>>() { // from class: zipkin.storage.cassandra.CassandraSpanStore.4
            @Override // com.google.common.base.Function
            public List<Span> apply(@Nullable List<Span> list) {
                if (CassandraSpanStore.this.strictTraceId) {
                    Iterator<Span> it = list.iterator();
                    while (it.hasNext()) {
                        long j3 = it.next().traceIdHigh;
                        if (j3 != 0 && j3 != j) {
                            it.remove();
                        }
                    }
                }
                if (list.isEmpty()) {
                    return null;
                }
                return list;
            }
        });
    }

    @Override // zipkin.storage.guava.GuavaSpanStore
    public ListenableFuture<List<Span>> getTrace(long j) {
        return getTrace(0L, j);
    }

    @Override // zipkin.storage.guava.GuavaSpanStore
    public ListenableFuture<List<Span>> getTrace(long j, long j2) {
        return Futures.transform(getRawTrace(j, j2), AdjustTrace.INSTANCE);
    }

    @Override // zipkin.storage.guava.GuavaSpanStore
    public ListenableFuture<List<String>> getServiceNames() {
        try {
            return Futures.transform(this.session.executeAsync(CassandraUtil.bindWithName(this.selectServiceNames, "select-service-names")), new Function<ResultSet, List<String>>() { // from class: zipkin.storage.cassandra.CassandraSpanStore.5
                @Override // com.google.common.base.Function
                public List<String> apply(@Nullable ResultSet resultSet) {
                    HashSet hashSet = new HashSet();
                    Iterator<Row> it = resultSet.iterator();
                    while (it.hasNext()) {
                        hashSet.add(it.next().getString("service_name"));
                    }
                    return Ordering.natural().sortedCopy(hashSet);
                }
            });
        } catch (RuntimeException e) {
            return Futures.immediateFailedFuture(e);
        }
    }

    @Override // zipkin.storage.guava.GuavaSpanStore
    public ListenableFuture<List<String>> getSpanNames(String str) {
        if (str == null || str.isEmpty()) {
            return EMPTY_LIST;
        }
        try {
            return Futures.transform(this.session.executeAsync(CassandraUtil.bindWithName(this.selectSpanNames, "select-span-names").setString("service_name", ((String) Preconditions.checkNotNull(str, "serviceName")).toLowerCase()).setInt("bucket", 0).setInt("limit_", 1000)), new Function<ResultSet, List<String>>() { // from class: zipkin.storage.cassandra.CassandraSpanStore.6
                @Override // com.google.common.base.Function
                public List<String> apply(@Nullable ResultSet resultSet) {
                    HashSet hashSet = new HashSet();
                    Iterator<Row> it = resultSet.iterator();
                    while (it.hasNext()) {
                        hashSet.add(it.next().getString("span_name"));
                    }
                    return Ordering.natural().sortedCopy(hashSet);
                }
            });
        } catch (RuntimeException e) {
            return Futures.immediateFailedFuture(e);
        }
    }

    @Override // zipkin.storage.guava.GuavaSpanStore
    public ListenableFuture<List<DependencyLink>> getDependencies(long j, @Nullable Long l) {
        try {
            return Futures.transform(this.session.executeAsync(CassandraUtil.bindWithName(this.selectDependencies, "select-dependencies").setList("days", (List) Util.getDays(j, l))), ConvertDependenciesResponse.INSTANCE);
        } catch (RuntimeException e) {
            return Futures.immediateFailedFuture(e);
        }
    }

    ListenableFuture<List<Span>> getSpansByTraceIds(Set<Long> set, int i) {
        Preconditions.checkNotNull(set, "traceIds");
        if (set.isEmpty()) {
            return Futures.immediateFuture(Collections.emptyList());
        }
        try {
            BoundStatement boundStatement = CassandraUtil.bindWithName(this.selectTraces, "select-traces").setSet("trace_id", (Set) set).setInt("limit_", i);
            boundStatement.setFetchSize(Integer.MAX_VALUE);
            return Futures.transform(this.session.executeAsync(boundStatement), new Function<ResultSet, List<Span>>() { // from class: zipkin.storage.cassandra.CassandraSpanStore.7
                @Override // com.google.common.base.Function
                public List<Span> apply(@Nullable ResultSet resultSet) {
                    ArrayList arrayList = new ArrayList(resultSet.getAvailableWithoutFetching());
                    Iterator<Row> it = resultSet.iterator();
                    while (it.hasNext()) {
                        arrayList.add(Codec.THRIFT.readSpan(it.next().getBytes(ErrorsTag.SPAN_TAG)));
                    }
                    return arrayList;
                }
            });
        } catch (RuntimeException e) {
            return Futures.immediateFailedFuture(e);
        }
    }

    ListenableFuture<Map<Long, Long>> getTraceIdsByServiceNames(List<String> list, long j, long j2, int i) {
        if (list.isEmpty()) {
            return Futures.immediateFuture(Collections.emptyMap());
        }
        long max = Math.max(j - j2, 0L);
        try {
            BoundStatement boundStatement = list.size() == 1 ? CassandraUtil.bindWithName(this.selectTraceIdsByServiceName, "select-trace-ids-by-service-name").setString("service_name", list.get(0)).setSet("bucket", (Set) this.buckets).setBytesUnsafe("start_ts", this.timestampCodec.serialize(max)).setBytesUnsafe("end_ts", this.timestampCodec.serialize(j)).setInt("limit_", i) : CassandraUtil.bindWithName(this.selectTraceIdsByServiceNames, "select-trace-ids-by-service-names").setList("service_name", (List) list).setSet("bucket", (Set) this.buckets).setBytesUnsafe("start_ts", this.timestampCodec.serialize(max)).setBytesUnsafe("end_ts", this.timestampCodec.serialize(j)).setInt("limit_", i);
            boundStatement.setFetchSize(Integer.MAX_VALUE);
            return Futures.transform(this.session.executeAsync(boundStatement), this.traceIdToTimestamp);
        } catch (RuntimeException e) {
            return Futures.immediateFailedFuture(e);
        }
    }

    ListenableFuture<Map<Long, Long>> getTraceIdsBySpanName(String str, String str2, long j, long j2, int i) {
        Preconditions.checkArgument(str != null, "serviceName required on spanName query");
        Preconditions.checkArgument(str2 != null, "spanName required on spanName query");
        try {
            return Futures.transform(this.session.executeAsync(CassandraUtil.bindWithName(this.selectTraceIdsBySpanName, "select-trace-ids-by-span-name").setString("service_span_name", str + "." + str2).setBytesUnsafe("start_ts", this.timestampCodec.serialize(Math.max(j - j2, 0L))).setBytesUnsafe("end_ts", this.timestampCodec.serialize(j)).setInt("limit_", i)), this.traceIdToTimestamp);
        } catch (RuntimeException e) {
            return Futures.immediateFailedFuture(e);
        }
    }

    ListenableFuture<Map<Long, Long>> getTraceIdsByAnnotation(String str, long j, long j2, int i) {
        try {
            BoundStatement boundStatement = CassandraUtil.bindWithName(this.selectTraceIdsByAnnotation, "select-trace-ids-by-annotation").setBytes("annotation", CassandraUtil.toByteBuffer(str)).setSet("bucket", (Set) this.buckets).setBytesUnsafe("start_ts", this.timestampCodec.serialize(Math.max(j - j2, 0L))).setBytesUnsafe("end_ts", this.timestampCodec.serialize(j)).setInt("limit_", i);
            boundStatement.setFetchSize(Integer.MAX_VALUE);
            return Futures.transform(this.session.executeAsync(boundStatement), new Function<ResultSet, Map<Long, Long>>() { // from class: zipkin.storage.cassandra.CassandraSpanStore.8
                @Override // com.google.common.base.Function
                public Map<Long, Long> apply(@Nullable ResultSet resultSet) {
                    LinkedHashMap linkedHashMap = new LinkedHashMap();
                    for (Row row : resultSet) {
                        linkedHashMap.put(Long.valueOf(row.getLong("trace_id")), Long.valueOf(CassandraSpanStore.this.timestampCodec.deserialize(row, "ts")));
                    }
                    return linkedHashMap;
                }
            });
        } catch (RuntimeException | CharacterCodingException e) {
            return Futures.immediateFailedFuture(e);
        }
    }
}
