package edu.iu.dsc.tws.executor.comms.streaming;

import edu.iu.dsc.tws.api.comms.Communicator;
import edu.iu.dsc.tws.api.comms.DataFlowOperation;
import edu.iu.dsc.tws.api.comms.LogicalPlan;
import edu.iu.dsc.tws.api.comms.ReduceFunction;
import edu.iu.dsc.tws.api.comms.SingularReceiver;
import edu.iu.dsc.tws.api.compute.IFunction;
import edu.iu.dsc.tws.api.compute.IMessage;
import edu.iu.dsc.tws.api.compute.TaskMessage;
import edu.iu.dsc.tws.api.compute.graph.Edge;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.comms.stream.SReduce;
import edu.iu.dsc.tws.executor.comms.AbstractParallelOperation;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;

/* loaded from: input_file:edu/iu/dsc/tws/executor/comms/streaming/ReduceStreamingOperation.class */
public class ReduceStreamingOperation extends AbstractParallelOperation {
    protected SReduce op;
    protected IFunction function;

    /* loaded from: input_file:edu/iu/dsc/tws/executor/comms/streaming/ReduceStreamingOperation$FinalSingularReceiver.class */
    private class FinalSingularReceiver implements SingularReceiver {
        private FinalSingularReceiver() {
        }

        public void init(Config config, Set<Integer> set) {
        }

        public boolean receive(int i, Object obj) {
            TaskMessage taskMessage = new TaskMessage(obj, ReduceStreamingOperation.this.inEdge, i);
            BlockingQueue blockingQueue = (BlockingQueue) ReduceStreamingOperation.this.outMessages.get(Integer.valueOf(i));
            return (blockingQueue == null || blockingQueue.offer(taskMessage)) ? true : true;
        }
    }

    /* loaded from: input_file:edu/iu/dsc/tws/executor/comms/streaming/ReduceStreamingOperation$ReduceFunctionImpl.class */
    private class ReduceFunctionImpl implements ReduceFunction {
        private IFunction fn;

        ReduceFunctionImpl(IFunction iFunction) {
            this.fn = iFunction;
        }

        public void init(Config config, DataFlowOperation dataFlowOperation, Map<Integer, List<Integer>> map) {
        }

        public Object reduce(Object obj, Object obj2) {
            return this.fn.onMessage(obj, obj2);
        }
    }

    public ReduceStreamingOperation(Config config, Communicator communicator, LogicalPlan logicalPlan, IFunction iFunction, Set<Integer> set, Set<Integer> set2, Edge edge) {
        super(config, communicator, logicalPlan, edge.getName());
        this.function = iFunction;
        if (set.size() == 0) {
            throw new IllegalArgumentException("Sources should have more than 0 elements");
        }
        if (iFunction == null) {
            throw new IllegalArgumentException("Operation expects a function");
        }
        if (set2.size() > 1) {
            throw new RuntimeException("Reduce can only have one target: " + set2);
        }
        this.op = new SReduce(this.channel.newWithConfig(edge.getProperties()), this.logicalPlan, set, set2.iterator().next().intValue(), edge.getDataType(), new ReduceFunctionImpl(this.function), new FinalSingularReceiver(), edge.getEdgeID().nextId(), edge.getMessageSchema());
    }

    public boolean send(int i, IMessage iMessage, int i2) {
        return this.op.reduce(i, iMessage.getContent(), i2);
    }

    /* renamed from: getOp, reason: merged with bridge method [inline-methods] */
    public SReduce m0getOp() {
        return this.op;
    }
}
