package io.basestar.storage.spark;

import com.google.common.collect.ImmutableList;
import io.basestar.expression.Expression;
import io.basestar.schema.Consistency;
import io.basestar.schema.ObjectSchema;
import io.basestar.schema.aggregate.Aggregate;
import io.basestar.spark.SparkSchemaUtils;
import io.basestar.spark.expression.SparkExpressionVisitor;
import io.basestar.storage.Storage;
import io.basestar.storage.StorageTraits;
import io.basestar.storage.util.Pager;
import io.basestar.util.PagedList;
import io.basestar.util.PagingToken;
import io.basestar.util.Sort;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/basestar/storage/spark/SparkStorage.class */
public class SparkStorage implements Storage {
    private static final Logger log = LoggerFactory.getLogger(SparkStorage.class);
    private final SparkSession session;
    private final SparkRouting routing;
    private final ExecutorService executor;

    /* loaded from: input_file:io/basestar/storage/spark/SparkStorage$Builder.class */
    public static class Builder {
        private SparkSession session;
        private SparkRouting routing;

        public SparkStorage build() {
            return new SparkStorage(this);
        }

        public Builder setSession(SparkSession sparkSession) {
            this.session = sparkSession;
            return this;
        }

        public Builder setRouting(SparkRouting sparkRouting) {
            this.routing = sparkRouting;
            return this;
        }
    }

    private SparkStorage(Builder builder) {
        this.session = builder.session;
        this.routing = builder.routing;
        this.executor = Executors.newSingleThreadExecutor();
    }

    public static Builder builder() {
        return new Builder();
    }

    public CompletableFuture<Map<String, Object>> readObject(ObjectSchema objectSchema, String str) {
        return CompletableFuture.supplyAsync(() -> {
            Dataset<Row> objectRead = this.routing.objectRead(this.session, objectSchema);
            Row row = (Row) objectRead.filter(objectRead.col("id").equalTo(str)).first();
            if (row == null) {
                return null;
            }
            return SparkSchemaUtils.fromSpark(objectSchema, row);
        }, this.executor);
    }

    public CompletableFuture<Map<String, Object>> readObjectVersion(ObjectSchema objectSchema, String str, long j) {
        return CompletableFuture.supplyAsync(() -> {
            Dataset<Row> historyRead = this.routing.historyRead(this.session, objectSchema);
            Row row = (Row) historyRead.filter(historyRead.col("id").equalTo(str).and(historyRead.col("version").equalTo(Long.valueOf(j)))).first();
            if (row == null) {
                return null;
            }
            return SparkSchemaUtils.fromSpark(objectSchema, row);
        }, this.executor);
    }

    public List<Pager.Source<Map<String, Object>>> query(ObjectSchema objectSchema, Expression expression, List<Sort> list) {
        return ImmutableList.of((i, pagingToken) -> {
            return CompletableFuture.supplyAsync(() -> {
                Dataset<Row> objectRead = this.routing.objectRead(this.session, objectSchema);
                Dataset filter = objectRead.filter((Column) expression.visit(new SparkExpressionVisitor(path -> {
                    return objectRead.col(path.toString());
                })));
                if (pagingToken != null) {
                }
                return new PagedList((List) filter.limit(i).collectAsList().stream().map(row -> {
                    return SparkSchemaUtils.fromSpark(objectSchema, row);
                }).collect(Collectors.toList()), (PagingToken) null);
            }, this.executor);
        });
    }

    public List<Pager.Source<Map<String, Object>>> aggregate(ObjectSchema objectSchema, Expression expression, Map<String, Expression> map, Map<String, Aggregate> map2) {
        throw new UnsupportedOperationException();
    }

    public Storage.ReadTransaction read(Consistency consistency) {
        return new Storage.ReadTransaction.Basic(this);
    }

    public Storage.WriteTransaction write(Consistency consistency) {
        throw new UnsupportedOperationException();
    }

    public Storage.EventStrategy eventStrategy(ObjectSchema objectSchema) {
        return Storage.EventStrategy.SUPPRESS;
    }

    public StorageTraits storageTraits(ObjectSchema objectSchema) {
        return SparkStorageTraits.INSTANCE;
    }
}
