package edu.iu.dsc.tws.comms.dfw.io.reduce;

import edu.iu.dsc.tws.api.comms.DataFlowOperation;
import edu.iu.dsc.tws.api.comms.ReduceFunction;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.comms.dfw.io.SourceReceiver;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/comms/dfw/io/reduce/ReduceStreamingReceiver.class */
public abstract class ReduceStreamingReceiver extends SourceReceiver {
    private static final Logger LOG = Logger.getLogger(ReduceStreamingReceiver.class.getName());
    protected ReduceFunction reduceFunction;
    private Map<Integer, Queue<Object>> reducedValuesMap;

    public ReduceStreamingReceiver(ReduceFunction reduceFunction) {
        this(0, reduceFunction);
    }

    public ReduceStreamingReceiver(int i, ReduceFunction reduceFunction) {
        this.reducedValuesMap = new HashMap();
        this.reduceFunction = reduceFunction;
        this.destination = i;
    }

    @Override // edu.iu.dsc.tws.comms.dfw.io.SourceReceiver
    public void init(Config config, DataFlowOperation dataFlowOperation, Map<Integer, List<Integer>> map) {
        super.init(config, dataFlowOperation, map);
        Iterator<Map.Entry<Integer, List<Integer>>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            this.reducedValuesMap.put(it.next().getKey(), new ArrayBlockingQueue(this.sendPendingMax));
        }
    }

    @Override // edu.iu.dsc.tws.comms.dfw.io.SourceReceiver
    protected boolean sendToTarget(int i, boolean z) {
        Queue<Object> queue = this.reducedValuesMap.get(Integer.valueOf(i));
        while (queue.size() > 0) {
            if (!handleMessage(i, queue.peek(), 0, this.destination)) {
                return false;
            }
            queue.poll();
        }
        return true;
    }

    @Override // edu.iu.dsc.tws.comms.dfw.io.SourceReceiver
    protected boolean aggregate(int i, boolean z, boolean z2) {
        Queue<Object> queue = this.reducedValuesMap.get(Integer.valueOf(i));
        Map<Integer, Queue<Object>> map = this.messages.get(Integer.valueOf(i));
        if (!z2 && !z) {
            return true;
        }
        if (queue.size() >= this.sendPendingMax) {
            return false;
        }
        Object obj = null;
        for (Map.Entry<Integer, Queue<Object>> entry : map.entrySet()) {
            if (obj == null) {
                obj = entry.getValue().poll();
            } else {
                Object poll = entry.getValue().poll();
                if (poll != null) {
                    obj = this.reduceFunction.reduce(obj, poll);
                }
            }
        }
        if (obj == null) {
            return true;
        }
        queue.offer(obj);
        return true;
    }

    @Override // edu.iu.dsc.tws.comms.dfw.io.SourceReceiver
    protected void onSyncEvent(int i, byte[] bArr) {
    }

    @Override // edu.iu.dsc.tws.comms.dfw.io.SourceReceiver
    protected boolean isAllEmpty(int i) {
        return this.reducedValuesMap.get(Integer.valueOf(i)).isEmpty();
    }

    @Override // edu.iu.dsc.tws.comms.dfw.io.SourceReceiver
    protected boolean sendSyncForward(int i) {
        return false;
    }

    @Override // edu.iu.dsc.tws.comms.dfw.io.SourceReceiver
    protected boolean isFilledToSend(int i, boolean z) {
        return this.reducedValuesMap.get(Integer.valueOf(i)).size() > 0;
    }

    public abstract boolean handleMessage(int i, Object obj, int i2, int i3);
}
