package net.e6tech.elements.cassandra.etl;

import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import net.e6tech.elements.cassandra.async.Async;
import net.e6tech.elements.cassandra.etl.Partition;
import net.e6tech.elements.cassandra.etl.PartitionContext;
import net.e6tech.elements.common.resources.Resources;
import net.e6tech.elements.common.util.TextBuilder;

/* loaded from: input_file:net/e6tech/elements/cassandra/etl/PartitionStrategy.class */
public class PartitionStrategy<S extends Partition, C extends PartitionContext> implements BatchStrategy<S, C> {
    @Override // net.e6tech.elements.cassandra.etl.BatchStrategy
    public int load(C c, List<S> list) {
        if (c.getLoadDelegate() != null) {
            return c.getLoadDelegate().apply(list).intValue();
        }
        return 0;
    }

    public Map<Comparable, Long> queryPartitions(C c) {
        LastUpdate lastUpdate = c.getLastUpdate();
        Comparable cutoff = c.getCutoff();
        String partitionKeyColumn = c.getInspector().getPartitionKeyColumn(0);
        String tableName = c.tableName();
        HashMap hashMap = new HashMap();
        ArrayList<Comparable> arrayList = new ArrayList();
        c.open().accept(Resources.class, resources -> {
            for (Row row : ((Session) resources.getInstance(Session.class)).execute(TextBuilder.using("select ${pk}, count(*) from ${table} where ${pk} > ${start} and ${pk} < ${end} group by ${pk} allow filtering").build("pk", partitionKeyColumn, "table", tableName, "start", lastUpdate.getLastUpdate(), "end", cutoff)).all()) {
                hashMap.put((Comparable) row.get(0, c.getPartitionKeyType()), Long.valueOf(row.getLong(1)));
                arrayList.add(Long.valueOf(row.getLong(0)));
            }
        });
        Collections.sort(arrayList);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (Comparable comparable : arrayList) {
            linkedHashMap.put(comparable, hashMap.get(comparable));
        }
        return linkedHashMap;
    }

    @Override // net.e6tech.elements.cassandra.etl.BatchStrategy
    public List<S> extract(C c) {
        String build = TextBuilder.using("select * from ${table} where ${pk} = :partitionKey").build("table", c.tableName(), "pk", c.getInspector().getPartitionKeyColumn(0));
        Async createAsync = c.createAsync(c.getPreparedStatements().computeIfAbsent("extract", str -> {
            return c.getSession().prepare(build);
        }));
        for (Comparable comparable : c.getPartitions()) {
            createAsync.execute(boundStatement -> {
                boundStatement.set("partitionKey", comparable, comparable.getClass());
            });
        }
        ArrayList arrayList = new ArrayList();
        createAsync.inExecutionOrder(resultSet -> {
            arrayList.addAll(c.getMapper(c.getSourceClass()).map(resultSet).all());
        });
        return arrayList;
    }

    @Override // net.e6tech.elements.cassandra.etl.BatchStrategy, net.e6tech.elements.cassandra.etl.Strategy
    public int run(C c) {
        int i = 0;
        c.initialize();
        Map<Comparable, Long> queryPartitions = queryPartitions(c);
        logger.info("Extracting Class {} to {}", c.getSourceClass(), getClass());
        c.reset();
        ArrayList arrayList = new ArrayList();
        while (queryPartitions.size() > 0) {
            LastUpdate lastUpdate = c.getLastUpdate();
            arrayList.clear();
            boolean z = true;
            long j = 0;
            for (Map.Entry<Comparable, Long> entry : queryPartitions.entrySet()) {
                if (!z) {
                    if (j + entry.getValue().longValue() > c.getBatchSize()) {
                        break;
                    }
                    j += entry.getValue().longValue();
                    arrayList.add(entry.getKey());
                } else {
                    arrayList.add(entry.getKey());
                    z = false;
                    j = entry.getValue().longValue();
                }
            }
            for (Comparable comparable : arrayList) {
                queryPartitions.remove(comparable);
                lastUpdate.update(comparable);
            }
            i += run(arrayList, c);
            c.saveLastUpdate(lastUpdate);
        }
        logger.info("Done loading {} instances of {}", Integer.valueOf(i), c.getSourceClass());
        c.reset();
        return i;
    }

    public int run(List<Comparable> list, C c) {
        c.setPartitions(list);
        int load = load((PartitionStrategy<S, C>) c, extract((PartitionStrategy<S, C>) c));
        if (logger.isInfoEnabled()) {
            logger.info("Processed {} instance of {}", Integer.valueOf(load), c.extractor());
        }
        return load;
    }
}
