package edu.iu.dsc.tws.comms.dfw.io.reduce.keyed;

import edu.iu.dsc.tws.api.comms.ReduceFunction;
import edu.iu.dsc.tws.api.comms.structs.Tuple;
import edu.iu.dsc.tws.comms.dfw.io.KeyedReceiver;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/comms/dfw/io/reduce/keyed/KReduceBatchReceiver.class */
public abstract class KReduceBatchReceiver extends KeyedReceiver {
    private static final Logger LOG = Logger.getLogger(KReduceBatchReceiver.class.getName());
    protected ReduceFunction reduceFunction;

    @Override // edu.iu.dsc.tws.comms.dfw.io.KeyedReceiver
    protected boolean offerMessage(int i, Object obj) {
        Map<Object, Queue<Object>> map = this.messages.get(Integer.valueOf(i));
        if (!this.isFinalBatchReceiver && map.size() > this.keyLimit) {
            LOG.fine(String.format("Executor %d Partial cannot add any further keys needs flush ", Integer.valueOf(this.executor)));
            moveMessagesToSendQueue(i, map);
            return false;
        }
        if (!(obj instanceof List)) {
            if (reduceAndInsert(map, (Tuple) obj)) {
                return true;
            }
            throw new RuntimeException("Reduce operation should not fail to insert key");
        }
        Iterator it = ((List) obj).iterator();
        while (it.hasNext()) {
            if (!reduceAndInsert(map, (Tuple) it.next())) {
                throw new RuntimeException("Reduce operation should not fail to insert key");
            }
        }
        return true;
    }

    private boolean reduceAndInsert(Map<Object, Queue<Object>> map, Tuple tuple) {
        Object key = tuple.getKey();
        if (!map.containsKey(key)) {
            map.put(key, new ArrayDeque());
            return map.get(tuple.getKey()).offer(tuple.getValue());
        }
        return map.get(tuple.getKey()).offer(this.reduceFunction.reduce(map.get(tuple.getKey()).poll(), tuple.getValue()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // edu.iu.dsc.tws.comms.dfw.io.KeyedReceiver
    public boolean moveMessagesToSendQueue(int i, Map<Object, Queue<Object>> map) {
        Queue<Object> queue = this.sendQueue.get(Integer.valueOf(i));
        map.entrySet().removeIf(entry -> {
            return queue.offer(new Tuple(entry.getKey(), ((Queue) entry.getValue()).peek()));
        });
        return map.isEmpty();
    }
}
