package io.activej.etl;

import io.activej.async.process.AsyncCollector;
import io.activej.common.Preconditions;
import io.activej.datastream.StreamConsumerWithResult;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.processor.StreamSplitter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/activej/etl/LogDataConsumerSplitter.class */
public abstract class LogDataConsumerSplitter<T, D> implements LogDataConsumer<T, D> {

    /* loaded from: input_file:io/activej/etl/LogDataConsumerSplitter$Context.class */
    public final class Context {
        private final List<LogDataConsumer<?, D>> logDataConsumers = new ArrayList();
        private Iterator<? extends StreamDataAcceptor<?>> acceptors;

        public Context() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Nullable
        public final <X> StreamDataAcceptor<X> addOutput(LogDataConsumer<X, D> logDataConsumer) {
            if (this.acceptors != null) {
                return this.acceptors.next();
            }
            this.logDataConsumers.add(logDataConsumer);
            return null;
        }
    }

    @Override // io.activej.etl.LogDataConsumer
    public StreamConsumerWithResult<T, List<D>> consume() {
        AsyncCollector create = AsyncCollector.create(new ArrayList());
        LogDataConsumerSplitter<T, D>.Context context = new Context();
        createSplitter(context);
        StreamSplitter create2 = StreamSplitter.create(streamDataAcceptorArr -> {
            context.acceptors = Arrays.asList(streamDataAcceptorArr).iterator();
            return createSplitter(context);
        });
        Preconditions.checkState(!((Context) context).logDataConsumers.isEmpty());
        Iterator it = ((Context) context).logDataConsumers.iterator();
        while (it.hasNext()) {
            create.addPromise(create2.newOutput().streamTo(((LogDataConsumer) it.next()).consume()), (v0, v1) -> {
                v0.addAll(v1);
            });
        }
        return StreamConsumerWithResult.of(create2.getInput(), create.run().get());
    }

    protected abstract StreamDataAcceptor<T> createSplitter(@NotNull LogDataConsumerSplitter<T, D>.Context context);
}
