package io.activej.dataflow.collector;

import io.activej.common.builder.AbstractBuilder;
import io.activej.dataflow.DataflowClient;
import io.activej.dataflow.dataset.Dataset;
import io.activej.dataflow.dataset.DatasetUtils;
import io.activej.dataflow.graph.DataflowContext;
import io.activej.dataflow.graph.DataflowGraph;
import io.activej.dataflow.graph.Partition;
import io.activej.dataflow.graph.StreamId;
import io.activej.dataflow.node.Node;
import io.activej.dataflow.node.Nodes;
import io.activej.datastream.supplier.StreamSupplier;
import io.activej.reactor.AbstractReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/* loaded from: input_file:io/activej/dataflow/collector/AbstractCollector.class */
public abstract class AbstractCollector<T, A> extends AbstractReactive implements ICollector<T> {
    protected final Dataset<T> input;
    protected final DataflowClient client;
    protected long limit;

    /* loaded from: input_file:io/activej/dataflow/collector/AbstractCollector$Builder.class */
    public abstract class Builder<Self extends AbstractCollector<T, A>.Builder<Self, C>, C extends AbstractCollector<T, A>> extends AbstractBuilder<Self, C> {
        public Builder() {
        }

        public final Self withLimit(long j) {
            checkNotBuilt(this);
            AbstractCollector.this.limit = j;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doBuild, reason: merged with bridge method [inline-methods] */
        public C m43doBuild() {
            return (C) AbstractCollector.this;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractCollector(Reactor reactor, Dataset<T> dataset, DataflowClient dataflowClient) {
        super(reactor);
        this.limit = -1L;
        this.input = dataset;
        this.client = dataflowClient;
    }

    protected abstract A createAccumulator();

    protected abstract void accumulate(A a, StreamSupplier<T> streamSupplier);

    protected abstract StreamSupplier<T> getResult(A a);

    @Override // io.activej.dataflow.collector.ICollector
    public final StreamSupplier<T> compile(DataflowGraph dataflowGraph) {
        Reactive.checkInReactorThread(this);
        DataflowContext of = DataflowContext.of(dataflowGraph);
        List<StreamId> channels = this.input.channels(of);
        if (this.limit != -1) {
            int generateNodeIndex = of.generateNodeIndex();
            ArrayList arrayList = new ArrayList(channels.size());
            Iterator<StreamId> it = channels.iterator();
            while (it.hasNext()) {
                arrayList.addAll(DatasetUtils.limitStream(dataflowGraph, generateNodeIndex, this.limit, it.next()));
            }
            channels = arrayList;
        }
        A createAccumulator = createAccumulator();
        int generateNodeIndex2 = of.generateNodeIndex();
        for (StreamId streamId : channels) {
            Node upload = Nodes.upload(generateNodeIndex2, this.input.streamSchema(), streamId);
            Partition partition = dataflowGraph.getPartition(streamId);
            dataflowGraph.addNode(partition, upload);
            accumulate(createAccumulator, this.client.download(partition.address(), streamId, this.input.streamSchema()));
        }
        return getResult(createAccumulator);
    }
}
