package io.activej.dataflow.calcite;

import io.activej.codegen.DefiningClassLoader;
import io.activej.common.Checks;
import io.activej.dataflow.DataflowClient;
import io.activej.dataflow.ISqlDataflow;
import io.activej.dataflow.calcite.RelToDatasetConverter;
import io.activej.dataflow.calcite.optimizer.FilterScanTableRule;
import io.activej.dataflow.calcite.optimizer.SortScanTableRule;
import io.activej.dataflow.collector.AbstractCollector;
import io.activej.dataflow.collector.ICollector;
import io.activej.dataflow.collector.MergeCollector;
import io.activej.dataflow.collector.UnionCollector;
import io.activej.dataflow.dataset.Dataset;
import io.activej.dataflow.dataset.LocallySortedDataset;
import io.activej.dataflow.exception.DataflowException;
import io.activej.dataflow.graph.DataflowGraph;
import io.activej.dataflow.graph.Partition;
import io.activej.datastream.supplier.StreamSupplier;
import io.activej.datastream.supplier.StreamSuppliers;
import io.activej.promise.Promise;
import io.activej.reactor.AbstractReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import io.activej.record.Record;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
import org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
import org.apache.calcite.rel.metadata.RelMetadataQueryBase;
import org.apache.calcite.rel.rules.CoreRules;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.calcite.tools.Program;
import org.apache.calcite.tools.Programs;

/* loaded from: input_file:io/activej/dataflow/calcite/SqlDataflow.class */
public final class SqlDataflow extends AbstractReactive implements ISqlDataflow {
    private final DataflowClient client;
    private final List<Partition> partitions;
    private final SqlParser.Config parserConfig;
    private final SqlToRelConverter converter;
    private final SqlValidator validator;
    private final RelOptPlanner planner;
    private final DefiningClassLoader classLoader;
    private final RelTraitSet traits;

    private SqlDataflow(Reactor reactor, DataflowClient dataflowClient, List<Partition> list, SqlParser.Config config, SqlToRelConverter sqlToRelConverter, RelOptPlanner relOptPlanner, DefiningClassLoader definingClassLoader) {
        super(reactor);
        this.traits = RelTraitSet.createEmpty();
        this.client = dataflowClient;
        this.partitions = list;
        this.parserConfig = config;
        this.converter = sqlToRelConverter;
        this.validator = (SqlValidator) Checks.checkNotNull(sqlToRelConverter.validator);
        this.planner = relOptPlanner;
        this.classLoader = definingClassLoader;
    }

    public static SqlDataflow create(Reactor reactor, DataflowClient dataflowClient, List<Partition> list, SqlParser.Config config, SqlToRelConverter sqlToRelConverter, RelOptPlanner relOptPlanner, DefiningClassLoader definingClassLoader) {
        return new SqlDataflow(reactor, dataflowClient, list, config, sqlToRelConverter, relOptPlanner, definingClassLoader);
    }

    @Override // io.activej.dataflow.ISqlDataflow
    public Promise<StreamSupplier<Record>> query(String str) {
        Reactive.checkInReactorThread(this);
        try {
            return Promise.of(queryDataflow(convertToDataset(str)));
        } catch (DataflowException | SqlParseException e) {
            return Promise.ofException(e);
        }
    }

    public RelNode convertToNode(String str) throws SqlParseException, DataflowException {
        SqlNode validate = this.validator.validate(SqlParser.create(str, this.parserConfig).parseQuery());
        if (validate.getKind() == SqlKind.SELECT || validate.getKind() == SqlKind.UNION) {
            return optimize(convert(validate));
        }
        throw new DataflowException("Only 'SELECT' queries are allowed");
    }

    public Dataset<Record> convertToDataset(String str, long j) throws SqlParseException, DataflowException {
        return convert(convertToNode(str), j).unmaterializedDataset().materialize(Collections.emptyList());
    }

    public Dataset<Record> convertToDataset(String str) throws SqlParseException, DataflowException {
        return convertToDataset(str, -1L);
    }

    public RelToDatasetConverter.ConversionResult convert(RelNode relNode, long j) {
        return RelToDatasetConverter.convert(this.classLoader, relNode, j, this.partitions.size());
    }

    public RelToDatasetConverter.ConversionResult convert(RelNode relNode) {
        return convert(relNode, -1L);
    }

    public StreamSupplier<Record> queryDataflow(Dataset<Record> dataset) {
        Reactive.checkInReactorThread(this);
        return queryDataflow(dataset, -1L);
    }

    public StreamSupplier<Record> queryDataflow(Dataset<Record> dataset, long j) {
        Reactive.checkInReactorThread(this);
        if (j == 0) {
            return StreamSuppliers.empty();
        }
        ICollector<Record> createCollector = createCollector(dataset, j);
        DataflowGraph dataflowGraph = new DataflowGraph(this.reactor, this.client, this.partitions);
        StreamSupplier<Record> compile = createCollector.compile(dataflowGraph);
        Promise<Void> execute = dataflowGraph.execute();
        Objects.requireNonNull(compile);
        execute.whenException(compile::closeEx);
        return compile;
    }

    private RelRoot convert(SqlNode sqlNode) {
        if (RelMetadataQueryBase.THREAD_PROVIDERS.get() == null) {
            RelMetadataQueryBase.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.DEFAULT);
        }
        return this.converter.convertQuery(sqlNode, false, true);
    }

    private RelNode optimize(RelRoot relRoot) {
        HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();
        hepProgramBuilder.addRuleCollection(List.of(CoreRules.FILTER_INTO_JOIN, FilterScanTableRule.create(), SortScanTableRule.create()));
        return Programs.sequence(new Program[]{Programs.of(hepProgramBuilder.build(), true, DefaultRelMetadataProvider.INSTANCE)}).run(this.planner, relRoot.project(), this.traits, Collections.emptyList(), Collections.emptyList());
    }

    public CalciteSchema getSchema() {
        return this.validator.getCatalogReader().getRootSchema();
    }

    public RelDataTypeFactory getTypeFactory() {
        return this.validator.getTypeFactory();
    }

    public String explainPlan(String str) throws SqlParseException, DataflowException {
        return convertToNode(str).explain();
    }

    public String explainGraph(String str, long j) throws SqlParseException, DataflowException {
        return convertToDataset(str, j).toGraphViz();
    }

    public String explainGraph(String str) throws SqlParseException, DataflowException {
        return explainGraph(str, -1L);
    }

    public String explainNodes(String str, long j) throws SqlParseException, DataflowException {
        ICollector<Record> createCollector = createCollector(convertToDataset(str, j), j);
        DataflowGraph dataflowGraph = new DataflowGraph(this.reactor, this.client, this.partitions);
        createCollector.compile(dataflowGraph);
        return dataflowGraph.toGraphViz();
    }

    public String explainNodes(String str) throws SqlParseException, DataflowException {
        return explainNodes(str, -1L);
    }

    private <B extends AbstractCollector<Record, ?>.Builder<B, C>, C extends AbstractCollector<Record, ?>> ICollector<Record> createCollector(Dataset<Record> dataset, long j) {
        AbstractCollector.Builder builder;
        if (dataset instanceof LocallySortedDataset) {
            builder = MergeCollector.builder(this.reactor, (LocallySortedDataset) dataset, this.client);
        } else {
            builder = UnionCollector.builder(this.reactor, dataset, this.client);
        }
        AbstractCollector.Builder builder2 = builder;
        if (j != -1) {
            builder2.withLimit(j);
        }
        return (ICollector) builder2.build();
    }
}
