package org.cloudgraph.hbase.service;

import commonj.sdo.DataGraph;
import commonj.sdo.Type;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.RandomRowFilter;
import org.cloudgraph.hbase.connect.Connection;
import org.cloudgraph.hbase.connect.HBaseConnectionManager;
import org.cloudgraph.hbase.io.DistributedGraphOperation;
import org.cloudgraph.hbase.io.DistributedGraphReader;
import org.cloudgraph.hbase.io.TableReader;
import org.cloudgraph.hbase.scan.CompleteRowKey;
import org.cloudgraph.hbase.scan.FuzzyRowKey;
import org.cloudgraph.hbase.scan.PartialRowKey;
import org.cloudgraph.hbase.scan.PartialRowKeyScanAssembler;
import org.cloudgraph.hbase.scan.ScanCollector;
import org.cloudgraph.hbase.util.FilterUtil;
import org.cloudgraph.query.expr.Expr;
import org.cloudgraph.query.expr.ExprPrinter;
import org.cloudgraph.recognizer.GraphRecognizerSyntaxTreeAssembler;
import org.cloudgraph.store.service.GraphServiceException;
import org.plasma.query.collector.SelectionCollector;
import org.plasma.query.model.From;
import org.plasma.query.model.OrderBy;
import org.plasma.query.model.Query;
import org.plasma.query.model.Where;
import org.plasma.sdo.PlasmaDataGraph;
import org.plasma.sdo.PlasmaType;
import org.plasma.sdo.access.StreamQueryDispatcher;
import org.plasma.sdo.access.provider.common.DataGraphComparatorAssembler;
import org.plasma.sdo.helper.PlasmaTypeHelper;

/* loaded from: input_file:org/cloudgraph/hbase/service/GraphStreamQuery.class */
public class GraphStreamQuery extends GraphQuery implements ObservableOnSubscribe<DataGraph>, StreamQueryDispatcher {
    private static Log log = LogFactory.getLog(GraphStreamQuery.class);
    private Query query;
    private Timestamp snapshotDate;
    private int requestMax;
    private ObservableEmitter<DataGraph> emitter;

    public GraphStreamQuery(ServiceContext serviceContext) {
        super(serviceContext);
    }

    @Override // org.cloudgraph.hbase.service.GraphQuery
    public void close() {
        this.context.close();
    }

    public Observable<DataGraph> findAsStream(Query query, Timestamp timestamp) {
        this.query = query;
        this.snapshotDate = timestamp;
        this.requestMax = -1;
        return Observable.create(this);
    }

    public Observable<DataGraph> findAsStream(Query query, int i, Timestamp timestamp) {
        this.query = query;
        this.snapshotDate = timestamp;
        this.requestMax = i;
        return Observable.create(this);
    }

    public void subscribe(ObservableEmitter<DataGraph> observableEmitter) throws Exception {
        this.emitter = observableEmitter;
        From fromClause = this.query.getFromClause();
        PlasmaType plasmaType = (PlasmaType) PlasmaTypeHelper.INSTANCE.getType(fromClause.getEntity().getNamespaceURI(), fromClause.getEntity().getName());
        Where findWhereClause = this.query.findWhereClause();
        SelectionCollector selectionCollector = findWhereClause != null ? new SelectionCollector(this.query.getSelectClause(), findWhereClause, plasmaType) : new SelectionCollector(this.query.getSelectClause(), plasmaType);
        selectionCollector.setOnlyDeclaredProperties(false);
        Iterator it = selectionCollector.getTypes().iterator();
        while (it.hasNext()) {
            collectRowKeyProperties(selectionCollector, (PlasmaType) ((Type) it.next()));
        }
        if (log.isDebugEnabled()) {
            log.debug(selectionCollector.dumpInheritedProperties());
        }
        Filter filter = createRootColumnFilterAssembler(plasmaType, selectionCollector).getFilter();
        List<PartialRowKey> arrayList = new ArrayList();
        List<FuzzyRowKey> arrayList2 = new ArrayList();
        List<CompleteRowKey> arrayList3 = new ArrayList();
        Expr expr = null;
        if (findWhereClause != null) {
            expr = new GraphRecognizerSyntaxTreeAssembler(findWhereClause, plasmaType).getResult();
            if (log.isDebugEnabled()) {
                ExprPrinter exprPrinter = new ExprPrinter();
                expr.accept(exprPrinter);
                log.debug("Graph Recognizer: " + exprPrinter.toString());
            }
            ScanCollector scanCollector = new ScanCollector(plasmaType);
            expr.accept(scanCollector);
            arrayList = scanCollector.getPartialRowKeyScans();
            arrayList2 = scanCollector.getFuzzyRowKeyScans();
            arrayList3 = scanCollector.getCompleteRowKeys();
            if (!scanCollector.isQueryRequiresGraphRecognizer()) {
                expr = null;
            }
        }
        if (findWhereClause == null || (arrayList.size() == 0 && arrayList2.size() == 0 && arrayList3.size() == 0)) {
            PartialRowKeyScanAssembler partialRowKeyScanAssembler = new PartialRowKeyScanAssembler(plasmaType);
            partialRowKeyScanAssembler.assemble();
            byte[] startKey = partialRowKeyScanAssembler.getStartKey();
            if (startKey != null && startKey.length > 0) {
                if ((this.query.getStartRange() == null || this.query.getStartRange().intValue() <= 0) && (this.query.getEndRange() == null || this.query.getEndRange().intValue() <= 0)) {
                    log.warn("no root predicate or range limit present - using default graph partial key scan - could result in very large results set");
                }
                arrayList.add(partialRowKeyScanAssembler);
            }
        }
        Comparator<PlasmaDataGraph> comparator = null;
        OrderBy findOrderByClause = this.query.findOrderByClause();
        if (findOrderByClause != null) {
            comparator = new DataGraphComparatorAssembler(findOrderByClause, plasmaType).getComparator();
        }
        executeAsStream(this.query, selectionCollector, plasmaType, filter, expr, comparator, arrayList, arrayList2, arrayList3, this.snapshotDate);
    }

    protected void executeAsStream(Query query, SelectionCollector selectionCollector, PlasmaType plasmaType, Filter filter, Expr expr, Comparator<PlasmaDataGraph> comparator, List<PartialRowKey> list, List<FuzzyRowKey> list2, List<CompleteRowKey> list3, Timestamp timestamp) {
        Connection connection = HBaseConnectionManager.instance().getConnection();
        DistributedGraphOperation distributedGraphOperation = null;
        try {
            try {
                DistributedGraphReader distributedGraphReader = new DistributedGraphReader(plasmaType, selectionCollector.getTypes(), connection);
                GraphAssemblerFactory graphAssemblerFactory = new GraphAssemblerFactory(query, plasmaType, distributedGraphReader, selectionCollector, timestamp);
                TableReader rootTableReader = distributedGraphReader.getRootTableReader();
                ResultsAssembler createResultsAssembler = createResultsAssembler(query, selectionCollector, expr, comparator, null, null, rootTableReader, graphAssemblerFactory);
                long currentTimeMillis = System.currentTimeMillis();
                if (list.size() <= 0 && list2.size() <= 0 && list3.size() <= 0) {
                    FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
                    filterList.addFilter(filter);
                    Scan scan = new Scan();
                    scan.setFilter(filterList);
                    Float randomSample = query.getFromClause().getRandomSample();
                    if (randomSample == null) {
                        log.warn("query resulted in no filters or scans - using full table scan - could result in very large results set");
                    } else {
                        filterList.addFilter(new RandomRowFilter(randomSample.floatValue()));
                        log.warn("using random-sample scan (" + randomSample + ") - could result in very large results set");
                    }
                    execute(scan, rootTableReader, createResultsAssembler);
                } else {
                    if (list3.size() > 0) {
                        throw new GraphServiceException("expected scan query for streaming context - use a non-streaming API for this query");
                    }
                    for (PartialRowKey partialRowKey : list) {
                        if (createResultsAssembler.isResultEndRangeReached()) {
                            break;
                        } else {
                            execute(partialRowKey, rootTableReader, filter, createResultsAssembler);
                        }
                    }
                    for (FuzzyRowKey fuzzyRowKey : list2) {
                        if (createResultsAssembler.isResultEndRangeReached()) {
                            break;
                        } else {
                            execute(fuzzyRowKey, rootTableReader, filter, createResultsAssembler);
                        }
                    }
                }
                if (log.isDebugEnabled()) {
                    log.debug("stream complete " + String.valueOf(createResultsAssembler.size()) + " assembled, " + String.valueOf(createResultsAssembler.getIgnoredResults()) + " ignored, " + String.valueOf(createResultsAssembler.getUnrecognizedResults()) + " unrecognized (" + String.valueOf(System.currentTimeMillis() - currentTimeMillis) + ")");
                }
                try {
                    connection.close();
                } catch (IOException e) {
                    log.error(e.getMessage(), e);
                }
                distributedGraphReader.close();
                this.emitter.onComplete();
            } finally {
                try {
                    connection.close();
                } catch (IOException e2) {
                    log.error(e2.getMessage(), e2);
                }
                distributedGraphOperation.close();
                this.emitter.onComplete();
            }
        } catch (IOException e3) {
            this.emitter.onError(new GraphServiceException(e3));
        } catch (Throwable th) {
            this.emitter.onError(new GraphServiceException(th));
            try {
                connection.close();
            } catch (IOException e4) {
                log.error(e4.getMessage(), e4);
            }
            distributedGraphOperation.close();
            this.emitter.onComplete();
        }
    }

    @Override // org.cloudgraph.hbase.service.GraphQuery
    protected void execute(Scan scan, TableReader tableReader, ResultsAssembler resultsAssembler) throws IOException {
        if (log.isDebugEnabled()) {
            log.debug("executing scan...");
        }
        if (log.isDebugEnabled()) {
            log.debug(FilterUtil.printFilterTree(scan.getFilter()));
        }
        ResultScanner<Result> scanner = tableReader.getTable().getScanner(scan);
        try {
            for (Result result : scanner) {
                if (log.isDebugEnabled()) {
                    log.debug(String.valueOf(tableReader.getTableConfig().getName()) + ": " + new String(result.getRow()));
                    for (KeyValue keyValue : result.list()) {
                        log.debug("\tkey: " + new String(keyValue.getQualifier()) + "\tvalue: " + new String(keyValue.getValue()));
                    }
                }
                if (resultsAssembler.isResultEndRangeReached()) {
                    break;
                } else if (resultsAssembler.collect(result)) {
                    this.emitter.onNext(resultsAssembler.getCurrentResult());
                }
            }
        } finally {
            if (scanner != null) {
                scanner.close();
            }
        }
    }

    @Override // org.cloudgraph.hbase.service.GraphQuery
    protected ResultsAssembler createResultsAssembler(Query query, SelectionCollector selectionCollector, Expr expr, Comparator<PlasmaDataGraph> comparator, Comparator<PlasmaDataGraph> comparator2, Expr expr2, TableReader tableReader, GraphAssemblerFactory graphAssemblerFactory) {
        return new StreamingResultsAssembler(expr, comparator, tableReader, graphAssemblerFactory.createAssembler(), query.getStartRange(), query.getEndRange());
    }
}
