package net.e6tech.elements.cassandra.etl;

import java.io.IOException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import net.e6tech.elements.cassandra.Session;
import net.e6tech.elements.cassandra.Sibyl;
import net.e6tech.elements.cassandra.async.AsyncPrepared;
import net.e6tech.elements.cassandra.driver.cql.Row;
import net.e6tech.elements.cassandra.etl.Partition;
import net.e6tech.elements.cassandra.etl.PartitionContext;
import net.e6tech.elements.common.reflection.ObjectConverter;
import net.e6tech.elements.common.resources.Resources;
import net.e6tech.elements.common.util.SystemException;
import net.e6tech.elements.common.util.TextBuilder;
import net.e6tech.elements.common.util.datastructure.Pair;

/* loaded from: input_file:net/e6tech/elements/cassandra/etl/PartitionStrategy.class */
public class PartitionStrategy<S extends Partition, C extends PartitionContext> implements BatchStrategy<S, C> {
    public static final String QUERY_PARTITION = "select ${pk}, count(*) from ${table} where ${pk} > ${start} and ${pk} < ${end} group by ${pk} allow filtering";
    public static final String ASYNC_QUERY_PARTITION = "select ${pk}, count(*) from ${table} where ${pk} = :pk";
    public static final String QUERY_RANGE = "select distinct ${pk} from ${table} where ${pk} > ${start} and ${pk} < ${end} allow filtering";
    public static final String ASYNC_QUERY_RANGE = "select ${pk} from ${table} where ${pk} = :pk";
    private ObjectConverter converter = new ObjectConverter();
    private String partitionTiming;

    /* loaded from: input_file:net/e6tech/elements/cassandra/etl/PartitionStrategy$PartitionQuery.class */
    public static class PartitionQuery<C extends PartitionContext> {
        C context;
        LastUpdate lastUpdate;
        Comparable<?> end;
        String partitionKey;
        String table;
        int retries;
        Integer asyncStep;
        Integer asyncMaxChunkSize;

        public PartitionQuery(C c) {
            this.context = c;
            this.lastUpdate = c.getLastUpdate();
            this.end = c.getCutoff();
            this.partitionKey = c.getInspector().getPartitionKeyColumn(0);
            this.table = c.tableName();
            this.asyncStep = c.getAsyncTimeUnitStepSize();
            if (c.getAsyncMaxNumOfChunks() == null) {
                this.asyncMaxChunkSize = 100;
            } else {
                this.asyncMaxChunkSize = c.getAsyncMaxNumOfChunks();
            }
            this.retries = c.getRetries();
            if (this.retries < 0) {
                this.retries = 0;
            }
        }
    }

    /* loaded from: input_file:net/e6tech/elements/cassandra/etl/PartitionStrategy$Range.class */
    public static class Range {
        BigDecimal start;
        BigDecimal end;

        public Range(BigDecimal bigDecimal, BigDecimal bigDecimal2) {
            this.start = bigDecimal;
            this.end = bigDecimal2;
        }

        public String toString() {
            return "range start= " + this.start + " end=" + this.end;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // net.e6tech.elements.cassandra.etl.BatchStrategy
    public int load(C c, List<S> list) {
        if (c.getLoadDelegate() != null) {
            return c.getLoadDelegate().applyAsInt(list);
        }
        return 0;
    }

    public Map<Comparable<Object>, Long> queryPartitions(PartitionQuery<C> partitionQuery) {
        long currentTimeMillis = System.currentTimeMillis();
        String build = TextBuilder.using(QUERY_PARTITION).build("pk", partitionQuery.partitionKey, "table", partitionQuery.table, "start", partitionQuery.lastUpdate.getLastUpdate(), "end", partitionQuery.end);
        Map<Comparable<Object>, Long> synchronizedMap = Collections.synchronizedMap(new TreeMap());
        List<Comparable<Object>> synchronizedList = Collections.synchronizedList(new LinkedList());
        partitionQuery.context.open().accept(Resources.class, resources -> {
            try {
                for (Row row : ((Session) resources.getInstance(Session.class)).execute(build).all()) {
                    Comparable comparable = (Comparable) row.get(0, partitionQuery.context.getPartitionKeyType());
                    synchronizedMap.put(comparable, row.get(1, Long.class));
                    synchronizedList.add(comparable);
                }
            } catch (Exception e) {
                logger.warn("queryPartitions failed: " + build, e);
                throw e;
            }
        });
        Map<Comparable<Object>, Long> sortPartitions = sortPartitions(synchronizedMap, synchronizedList);
        this.partitionTiming = "queryPartition for " + partitionQuery.table + " took " + (System.currentTimeMillis() - currentTimeMillis) + "ms";
        return sortPartitions;
    }

    public Map<Comparable<Object>, Long> queryPartitions2(PartitionQuery<C> partitionQuery) {
        try {
            new BigDecimal(partitionQuery.lastUpdate.getLastUpdate());
            if (partitionQuery.context.getTimeUnit() == null || partitionQuery.asyncStep == null || partitionQuery.asyncStep.intValue() <= 0) {
                return queryPartitions(partitionQuery);
            }
            long currentTimeMillis = System.currentTimeMillis();
            String buildQuery = buildQuery(partitionQuery, QUERY_PARTITION, ASYNC_QUERY_PARTITION);
            Map<Comparable<Object>, Long> synchronizedMap = Collections.synchronizedMap(new TreeMap());
            List<Comparable<Object>> synchronizedList = Collections.synchronizedList(new LinkedList());
            asyncQuery(partitionQuery, buildQuery, row -> {
                Comparable comparable = (Comparable) row.get(0, partitionQuery.context.getPartitionKeyType());
                synchronizedMap.put(comparable, row.get(1, Long.class));
                synchronizedList.add(comparable);
            });
            Map<Comparable<Object>, Long> sortPartitions = sortPartitions(synchronizedMap, synchronizedList);
            this.partitionTiming = "queryPartition2 for " + partitionQuery.table + " took " + (System.currentTimeMillis() - currentTimeMillis) + "ms";
            return sortPartitions;
        } catch (Exception e) {
            logger.warn("Cannot parse latUpdate " + partitionQuery.lastUpdate.getLastUpdate() + " for " + partitionQuery.lastUpdate.getExtractor());
            return queryPartitions(partitionQuery);
        }
    }

    private String buildQuery(PartitionQuery<C> partitionQuery, String str, String str2) {
        return partitionQuery.context.isAsyncUseFutures() ? TextBuilder.using(str2).build("pk", partitionQuery.partitionKey, "table", partitionQuery.table) : TextBuilder.using(str).build("pk", partitionQuery.partitionKey, "table", partitionQuery.table, "start", ":start", "end", ":end");
    }

    private Map<Comparable<Object>, Long> sortPartitions(Map<Comparable<Object>, Long> map, List<Comparable<Object>> list) {
        list.sort(null);
        LinkedHashMap linkedHashMap = new LinkedHashMap(list.size() + 1, 1.0f);
        for (Comparable<Object> comparable : list) {
            linkedHashMap.put(comparable, map.get(comparable));
        }
        return linkedHashMap;
    }

    private Pair<BigDecimal, BigDecimal> fromAndTo(PartitionQuery<C> partitionQuery) {
        BigDecimal bigDecimal = new BigDecimal(partitionQuery.lastUpdate.getLastUpdate());
        BigDecimal bigDecimal2 = new BigDecimal(partitionQuery.context.getCutoff().toString());
        if (bigDecimal.compareTo(BigDecimal.ZERO) <= 0) {
            bigDecimal = new BigDecimal(partitionQuery.context.getCutoff(System.currentTimeMillis(), partitionQuery.context.getMaxPast()).toString());
        }
        return new Pair<>(bigDecimal, bigDecimal2);
    }

    protected void asyncQuery(PartitionQuery<C> partitionQuery, String str, Consumer<Row> consumer) {
        C c = partitionQuery.context;
        Pair<BigDecimal, BigDecimal> fromAndTo = fromAndTo(partitionQuery);
        BigDecimal bigDecimal = (BigDecimal) fromAndTo.key();
        BigDecimal bigDecimal2 = (BigDecimal) fromAndTo.value();
        List<Range> asyncRanges = getAsyncRanges(str, bigDecimal, bigDecimal2, partitionQuery.asyncStep.intValue(), partitionQuery.asyncMaxChunkSize.intValue());
        loop0: while (true) {
            List<Range> list = asyncRanges;
            if (list.isEmpty()) {
                return;
            }
            for (int i = partitionQuery.retries; i >= 0; i--) {
                try {
                    try {
                        execAsyncQuery(partitionQuery.context, str, list, consumer);
                        int i2 = i - 1;
                        break;
                    } catch (Exception e) {
                        String str2 = "extractor=" + c.extractor() + " sourceClass=" + c.getSourceClass() + " tableName=" + c.tableName();
                        if (i == 0) {
                            logger.warn("Cannot transmutate " + str, e);
                            throw e;
                        }
                        logger.warn("Cannot transmutate " + str + ", " + i + " retry attempts left, " + str2, e);
                        try {
                            Thread.sleep(c.getRetrySleep());
                        } catch (InterruptedException e2) {
                            Thread.currentThread().interrupt();
                        }
                    }
                } catch (Throwable th) {
                    int i3 = i - 1;
                    throw th;
                }
            }
            asyncRanges = getAsyncRanges(str, list.get(list.size() - 1).end.subtract(BigDecimal.ONE), bigDecimal2, partitionQuery.asyncStep.intValue(), partitionQuery.asyncMaxChunkSize.intValue());
        }
    }

    protected void execAsyncQuery(C c, String str, List<Range> list, Consumer<Row> consumer) {
        if (c.isAsyncUseFutures()) {
            execAsyncQuery2(c, str, list, consumer);
        } else {
            c.open().accept(Sibyl.class, sibyl -> {
                sibyl.createAsync(str).execute(list, (range, bound) -> {
                    try {
                        bound.set("start", (String) this.converter.convert(range.start.toString(), c.getPartitionKeyType(), (ObjectConverter.InstanceCreationListener) null), (Class<String>) c.getPartitionKeyType());
                        bound.set("end", (String) this.converter.convert(range.end.toPlainString(), c.getPartitionKeyType(), (ObjectConverter.InstanceCreationListener) null), (Class<String>) c.getPartitionKeyType());
                    } catch (IOException e) {
                        throw new SystemException(e);
                    }
                }).inExecutionOrderRows(consumer);
            });
        }
    }

    protected void execAsyncQuery2(C c, String str, List<Range> list, Consumer<Row> consumer) {
        for (Range range : list) {
            LinkedList linkedList = new LinkedList();
            BigDecimal add = range.start.add(BigDecimal.ONE);
            while (true) {
                BigDecimal bigDecimal = add;
                if (bigDecimal.compareTo(range.end) < 0) {
                    linkedList.add(bigDecimal);
                    add = bigDecimal.add(BigDecimal.ONE);
                }
            }
            c.open().accept(Sibyl.class, sibyl -> {
                sibyl.createAsync(str).execute(linkedList, (bigDecimal2, bound) -> {
                    try {
                        bound.set("pk", (String) this.converter.convert(bigDecimal2.toPlainString(), c.getPartitionKeyType(), (ObjectConverter.InstanceCreationListener) null), (Class<String>) c.getPartitionKeyType());
                    } catch (IOException e) {
                        throw new SystemException(e);
                    }
                }).inExecutionOrderRows(row -> {
                    if (row.isNull(0)) {
                        return;
                    }
                    consumer.accept(row);
                });
            });
        }
    }

    protected List<Range> getAsyncRanges(String str, BigDecimal bigDecimal, BigDecimal bigDecimal2, int i, int i2) {
        BigDecimal bigDecimal3 = bigDecimal;
        BigDecimal add = bigDecimal.add(new BigDecimal(i));
        if (add.compareTo(bigDecimal2) > 0) {
            add = bigDecimal2;
        }
        if (add.subtract(bigDecimal3).compareTo(BigDecimal.ONE) <= 0) {
            return new ArrayList();
        }
        LinkedList linkedList = new LinkedList();
        boolean z = false;
        while (true) {
            if (!z) {
                z = true;
            } else if (add.compareTo(bigDecimal2) >= 0 || linkedList.size() >= i2) {
                break;
            }
            linkedList.add(new Range(bigDecimal3, add));
            bigDecimal3 = add.subtract(BigDecimal.ONE);
            add = bigDecimal3.add(new BigDecimal(i));
            if (add.compareTo(bigDecimal2) > 0) {
                add = bigDecimal2;
            }
        }
        Range range = (Range) linkedList.get(0);
        if (bigDecimal.compareTo(range.start) != 0) {
            logger.warn("async range error for {}, first {} ", str, range);
        }
        return new ArrayList(linkedList);
    }

    @Override // net.e6tech.elements.cassandra.etl.BatchStrategy
    public List<S> extract(C c) {
        return (List) c.open().apply(Sibyl.class, sibyl -> {
            String build = TextBuilder.using("select * from ${tbl} where ${pk} = :partitionKey").build("tbl", c.tableName(), "pk", c.getInspector().getPartitionKeyColumn(0));
            AsyncPrepared createAsync = sibyl.createAsync(c.getPreparedStatements().computeIfAbsent("extract", str -> {
                return sibyl.getSession().prepare(build);
            }));
            for (Comparable<?> comparable : c.getPartitions()) {
                createAsync.execute(bound -> {
                    bound.set("partitionKey", (String) comparable, (Class<String>) comparable.getClass());
                });
            }
            ArrayList arrayList = new ArrayList();
            createAsync.inExecutionOrder(asyncResultSet -> {
                arrayList.addAll(sibyl.mapAll(c.getSourceClass(), asyncResultSet));
            });
            return arrayList;
        });
    }

    @Override // net.e6tech.elements.cassandra.etl.BatchStrategy, net.e6tech.elements.cassandra.etl.Strategy
    public int run(C c) {
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        c.initialize();
        PartitionQuery<C> partitionQuery = new PartitionQuery<>(c);
        Map<Comparable<Object>, Long> queryPartitions2 = queryPartitions2(partitionQuery);
        c.reset();
        LinkedList linkedList = new LinkedList();
        boolean isEmpty = queryPartitions2.isEmpty();
        while (queryPartitions2.size() > 0) {
            LastUpdate lastUpdate = c.getLastUpdate();
            linkedList.clear();
            boolean z = true;
            long j = 0;
            for (Map.Entry<Comparable<Object>, Long> entry : queryPartitions2.entrySet()) {
                if (!z) {
                    if (j + entry.getValue().longValue() > c.getBatchSize()) {
                        break;
                    }
                    j += entry.getValue().longValue();
                    linkedList.add(entry.getKey());
                } else {
                    linkedList.add(entry.getKey());
                    z = false;
                    j = entry.getValue().longValue();
                }
            }
            int run = run(c, linkedList);
            i += run;
            if (logger.isInfoEnabled()) {
                logger.info("Batch loaded {} instances of {}", Integer.valueOf(run), c.extractor());
            }
            for (Comparable<?> comparable : linkedList) {
                queryPartitions2.remove(comparable);
                lastUpdate.update(comparable);
            }
            c.saveLastUpdate(lastUpdate);
        }
        if (c.getTimeUnit() != null && isEmpty && c.isAsyncUseFutures()) {
            LastUpdate lastUpdate2 = c.getLastUpdate();
            BigDecimal bigDecimal = new BigDecimal(lastUpdate2.getLastUpdate());
            BigDecimal bigDecimal2 = new BigDecimal(partitionQuery.context.getCutoff().toString());
            if (bigDecimal2.compareTo(bigDecimal) > 0) {
                BigDecimal timeExpressedInCorrectTimeUnit = timeExpressedInCorrectTimeUnit(c.getTimeUnit(), System.currentTimeMillis());
                if (bigDecimal2.compareTo(timeExpressedInCorrectTimeUnit) > 0) {
                    lastUpdate2.update(timeExpressedInCorrectTimeUnit.subtract(BigDecimal.ONE));
                } else {
                    lastUpdate2.update(bigDecimal2.subtract(BigDecimal.ONE));
                }
                c.saveLastUpdate(lastUpdate2);
            }
        }
        if (logger.isInfoEnabled()) {
            logger.info("{}.run for {} loaded {} instances of {} took {}ms, {}", new Object[]{getClass().getSimpleName(), c.extractor(), Integer.valueOf(i), c.getSourceClass().getSimpleName(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), this.partitionTiming});
        }
        c.reset();
        return i;
    }

    public BigDecimal timeExpressedInCorrectTimeUnit(TimeUnit timeUnit, long j) {
        BigDecimal bigDecimal = new BigDecimal(j);
        return timeUnit == TimeUnit.DAYS ? bigDecimal.divide(new BigDecimal(ETLContext.DAY), 0, RoundingMode.DOWN) : timeUnit == TimeUnit.HOURS ? bigDecimal.divide(new BigDecimal(ETLContext.HOUR), 0, RoundingMode.DOWN) : timeUnit == TimeUnit.MINUTES ? bigDecimal.divide(new BigDecimal(ETLContext.MINUTE), 0, RoundingMode.DOWN) : timeUnit == TimeUnit.SECONDS ? bigDecimal.divide(new BigDecimal(1000L), 0, RoundingMode.DOWN) : bigDecimal;
    }

    public int run(C c, List<Comparable<?>> list) {
        c.setPartitions(list);
        return load((PartitionStrategy<S, C>) c, extract((PartitionStrategy<S, C>) c));
    }

    public List<Comparable<Object>> queryRange(PartitionQuery<C> partitionQuery) {
        long currentTimeMillis = System.currentTimeMillis();
        C c = partitionQuery.context;
        LastUpdate lastUpdate = c.getLastUpdate();
        Comparable cutoff = c.getCutoff();
        String partitionKeyColumn = c.getInspector().getPartitionKeyColumn(0);
        String tableName = c.tableName();
        List<Comparable<Object>> synchronizedList = Collections.synchronizedList(new LinkedList());
        String build = TextBuilder.using(QUERY_RANGE).build("pk", partitionKeyColumn, "table", tableName, "start", lastUpdate.getLastUpdate(), "end", cutoff);
        c.open().accept(Resources.class, resources -> {
            try {
                Iterator<Row> it = ((Session) resources.getInstance(Session.class)).execute(build).all().iterator();
                while (it.hasNext()) {
                    synchronizedList.add((Comparable) it.next().get(0, c.getPartitionKeyType()));
                }
            } catch (Exception e) {
                logger.warn("PartitionStrategy.queryRange failed: " + build, e);
                throw e;
            }
        });
        synchronizedList.sort(null);
        this.partitionTiming = "queryRange for " + partitionQuery.table + " took " + (System.currentTimeMillis() - currentTimeMillis) + "ms";
        return synchronizedList;
    }

    public List<Comparable<Object>> queryRange2(PartitionQuery<C> partitionQuery) {
        try {
            new BigDecimal(partitionQuery.lastUpdate.getLastUpdate());
            if (partitionQuery.context.getTimeUnit() == null || partitionQuery.asyncStep == null || partitionQuery.asyncStep.intValue() <= 0) {
                return queryRange(partitionQuery);
            }
            long currentTimeMillis = System.currentTimeMillis();
            List<Comparable<Object>> synchronizedList = Collections.synchronizedList(new LinkedList());
            asyncQuery(partitionQuery, buildQuery(partitionQuery, QUERY_RANGE, ASYNC_QUERY_RANGE), row -> {
                synchronizedList.add((Comparable) row.get(0, partitionQuery.context.getPartitionKeyType()));
            });
            synchronizedList.sort(null);
            this.partitionTiming = "queryRange2 for " + partitionQuery.table + " took " + (System.currentTimeMillis() - currentTimeMillis) + "ms";
            return synchronizedList;
        } catch (Exception e) {
            logger.warn("Cannot parse latUpdate " + partitionQuery.lastUpdate.getLastUpdate() + " for " + partitionQuery.lastUpdate.getExtractor());
            return queryRange(partitionQuery);
        }
    }

    public int runPartitions(C c) {
        long currentTimeMillis = System.currentTimeMillis();
        c.initialize();
        List<Comparable<Object>> queryRange2 = queryRange2(new PartitionQuery<>(c));
        ArrayList arrayList = new ArrayList(c.getBatchSize());
        int i = 0;
        Iterator<Comparable<Object>> it = queryRange2.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next());
            if (arrayList.size() == c.getBatchSize()) {
                LastUpdate lastUpdate = c.getLastUpdate();
                i += runPartitions(arrayList, c);
                lastUpdate.update(arrayList.get(arrayList.size() - 1));
                c.saveLastUpdate(lastUpdate);
                arrayList.clear();
            }
        }
        if (!arrayList.isEmpty()) {
            LastUpdate lastUpdate2 = c.getLastUpdate();
            i += runPartitions(arrayList, c);
            lastUpdate2.update(arrayList.get(arrayList.size() - 1));
            c.saveLastUpdate(lastUpdate2);
            arrayList.clear();
        }
        if (logger.isInfoEnabled()) {
            logger.info("{}.runPartitions for {} loaded {} instances of {} took {}ms, {}", new Object[]{getClass().getSimpleName(), c.extractor(), Integer.valueOf(i), c.getSourceClass().getSimpleName(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), this.partitionTiming});
        }
        return queryRange2.size();
    }

    public int runPartitions(List<Comparable<?>> list, C c) {
        c.setPartitions(list);
        return c.getLoadDelegate().applyAsInt(list);
    }
}
