package edu.iu.dsc.tws.comms.dfw.io;

import edu.iu.dsc.tws.api.comms.DataFlowOperation;
import edu.iu.dsc.tws.api.comms.messaging.MessageReceiver;
import edu.iu.dsc.tws.api.comms.structs.Tuple;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.comms.utils.TaskPlanUtils;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/comms/dfw/io/KeyedReceiver.class */
public abstract class KeyedReceiver implements MessageReceiver {
    private static final Logger LOG = Logger.getLogger(KeyedReceiver.class.getName());
    protected int executor;
    protected int limitPerKey;
    protected int keyLimit;
    protected DataFlowOperation dataFlowOperation;
    protected int destination;
    protected int representSource;
    protected Set<Integer> thisSources;
    protected Map<Integer, Map<Integer, Boolean>> finishedSources = new ConcurrentHashMap();
    protected Map<Integer, Map<Object, Queue<Object>>> messages = new HashMap();
    protected Map<Integer, Queue<Object>> sendQueue = new HashMap();
    protected Map<Integer, Boolean> batchDone = new HashMap();
    protected Map<Integer, Boolean> isEmptySent = new HashMap();
    protected boolean isFinalBatchReceiver = false;
    protected boolean representSourceSet = false;

    public void init(Config config, DataFlowOperation dataFlowOperation, Map<Integer, List<Integer>> map) {
        this.dataFlowOperation = dataFlowOperation;
        this.executor = this.dataFlowOperation.getLogicalPlan().getThisWorker();
        this.limitPerKey = 100;
        this.keyLimit = 10;
        this.thisSources = TaskPlanUtils.getTasksOfThisWorker(dataFlowOperation.getLogicalPlan(), dataFlowOperation.getSources());
        for (Map.Entry<Integer, List<Integer>> entry : map.entrySet()) {
            HashMap hashMap = new HashMap();
            Iterator<Integer> it = entry.getValue().iterator();
            while (it.hasNext()) {
                hashMap.put(Integer.valueOf(it.next().intValue()), false);
            }
            this.finishedSources.put(entry.getKey(), hashMap);
            this.messages.put(entry.getKey(), new HashMap());
            this.batchDone.put(entry.getKey(), false);
            this.isEmptySent.put(entry.getKey(), false);
            this.sendQueue.put(entry.getKey(), new ConcurrentLinkedDeque());
        }
    }

    public boolean onMessage(int i, int i2, int i3, int i4, Object obj) {
        if (this.messages.get(Integer.valueOf(i3)) == null) {
            throw new RuntimeException(String.format("Worker %d, receive error. Receiver did not expect messages for this target %d", Integer.valueOf(this.executor), Integer.valueOf(i3)));
        }
        Map<Integer, Boolean> map = this.finishedSources.get(Integer.valueOf(i3));
        if ((i4 & 67108864) == 67108864) {
            map.put(Integer.valueOf(i), true);
            if (this.isFinalBatchReceiver || !isSourcesFinished(i3)) {
                return true;
            }
            return moveMessagesToSendQueue(i3, this.messages.get(Integer.valueOf(i3)));
        }
        if (!(obj instanceof Tuple) && !(obj instanceof List)) {
            throw new RuntimeException(String.format("Worker %d, receive error. Received object which is not of type Tuple or List for target %d", Integer.valueOf(this.executor), Integer.valueOf(i3)));
        }
        if (!this.representSourceSet) {
            this.representSource = i;
            this.representSourceSet = true;
        }
        boolean offerMessage = offerMessage(i3, obj);
        if (offerMessage && (i4 & 1073741824) == 1073741824) {
            map.put(Integer.valueOf(this.representSource), true);
            if (!this.isFinalBatchReceiver && isSourcesFinished(i3)) {
                moveMessagesToSendQueue(i3, this.messages.get(Integer.valueOf(i3)));
            }
        }
        return offerMessage;
    }

    protected boolean offerMessage(int i, Object obj) {
        Map<Object, Queue<Object>> map = this.messages.get(Integer.valueOf(i));
        if (!this.isFinalBatchReceiver && map.size() > this.keyLimit) {
            LOG.fine(String.format("Executor %d Partial cannot add any further keys needs flush ", Integer.valueOf(this.executor)));
            moveMessagesToSendQueue(i, map);
            return false;
        }
        if (!(obj instanceof AggregatedObjects)) {
            Tuple tuple = (Tuple) obj;
            if (!map.containsKey(tuple.getKey())) {
                ArrayDeque arrayDeque = new ArrayDeque();
                arrayDeque.add(tuple.getValue());
                map.put(tuple.getKey(), arrayDeque);
                return true;
            }
            if (map.get(tuple.getKey()).size() < this.limitPerKey) {
                return map.get(tuple.getKey()).offer(tuple.getValue());
            }
            LOG.fine(String.format("Executor %d Partial cannot add any further values for key needs flush ", Integer.valueOf(this.executor)));
            moveMessageToSendQueue(i, map, tuple.getKey());
            return false;
        }
        HashMap hashMap = new HashMap();
        for (Tuple tuple2 : (List) obj) {
            Object key = tuple2.getKey();
            if (!this.isFinalBatchReceiver && map.containsKey(key) && map.get(key).size() >= this.limitPerKey) {
                moveMessageToSendQueue(i, map, tuple2.getKey());
                LOG.fine(String.format("Executor %d Partial cannot add any further values for key needs flush ", Integer.valueOf(this.executor)));
                return false;
            }
            if (hashMap.containsKey(key)) {
                ((List) hashMap.get(key)).add(tuple2.getValue());
            } else {
                hashMap.put(key, new AggregatedObjects());
                ((List) hashMap.get(key)).add(tuple2.getValue());
            }
        }
        boolean z = true;
        for (Object obj2 : hashMap.keySet()) {
            if (map.containsKey(obj2)) {
                Iterator it = ((List) hashMap.get(obj2)).iterator();
                while (it.hasNext()) {
                    z &= map.get(obj2).offer(it.next());
                }
            } else {
                ArrayDeque arrayDeque2 = new ArrayDeque();
                Iterator it2 = ((List) hashMap.get(obj2)).iterator();
                while (it2.hasNext()) {
                    z &= arrayDeque2.offer(it2.next());
                }
                map.put(obj2, arrayDeque2);
            }
        }
        if (z) {
            return true;
        }
        throw new RuntimeException("Message lost during processing");
    }

    public void onFinish(int i) {
        Iterator<Integer> it = this.finishedSources.keySet().iterator();
        while (it.hasNext()) {
            this.finishedSources.get(it.next()).put(Integer.valueOf(i), true);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean moveMessagesToSendQueue(int i, Map<Object, Queue<Object>> map) {
        Queue<Object> queue = this.sendQueue.get(Integer.valueOf(i));
        map.entrySet().removeIf(entry -> {
            Queue queue2 = (Queue) entry.getValue();
            while (true) {
                Object peek = queue2.peek();
                if (peek == null) {
                    return true;
                }
                if (!queue.offer(new Tuple(entry.getKey(), peek))) {
                    return false;
                }
                queue2.poll();
            }
        });
        return map.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean moveMessageToSendQueue(int i, Map<Object, Queue<Object>> map, Object obj) {
        Queue<Object> queue = this.sendQueue.get(Integer.valueOf(i));
        Queue<Object> queue2 = map.get(obj);
        while (true) {
            Object peek = queue2.peek();
            if (peek == null) {
                if (!map.get(obj).isEmpty()) {
                    return false;
                }
                map.remove(obj);
                return true;
            }
            if (!queue.offer(new Tuple(obj, peek))) {
                return false;
            }
            queue2.poll();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isSourcesFinished(int i) {
        boolean z = true;
        Iterator<Boolean> it = this.finishedSources.get(Integer.valueOf(i)).values().iterator();
        while (it.hasNext()) {
            z &= it.next().booleanValue();
        }
        return z;
    }

    protected boolean checkIfEmptyIsSent(int i) {
        boolean z = true;
        if (!this.isEmptySent.get(Integer.valueOf(i)).booleanValue()) {
            if (this.dataFlowOperation.isDelegateComplete() && this.dataFlowOperation.sendPartial(i, new byte[0], 67108864, this.destination)) {
                this.isEmptySent.put(Integer.valueOf(i), true);
            } else {
                z = false;
            }
        }
        return z;
    }

    public boolean progress() {
        boolean z = false;
        Iterator<Integer> it = this.messages.keySet().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            if (this.batchDone.get(Integer.valueOf(intValue)).booleanValue()) {
                z = !checkIfEmptyIsSent(intValue);
            } else {
                Queue<Object> queue = this.sendQueue.get(Integer.valueOf(intValue));
                boolean isSourcesFinished = isSourcesFinished(intValue);
                if (!isSourcesFinished && (!this.dataFlowOperation.isDelegateComplete() || !this.messages.get(Integer.valueOf(intValue)).isEmpty() || !queue.isEmpty())) {
                    z = true;
                }
                if (!queue.isEmpty() || isSourcesFinished) {
                    z = sendToTarget(z, isSourcesFinished, intValue, queue);
                }
                boolean isAllQueuesEmpty = isAllQueuesEmpty(queue);
                if (!isAllQueuesEmpty) {
                    z = true;
                }
                if (this.dataFlowOperation.isDelegateComplete() && isSourcesFinished && isAllQueuesEmpty) {
                    z = finishProgress(z, intValue);
                }
            }
        }
        return z;
    }

    public boolean isComplete() {
        return false;
    }

    protected boolean finishProgress(boolean z, int i) {
        boolean z2 = z;
        Iterator<Integer> it = this.thisSources.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (!this.dataFlowOperation.sendPartial(it.next().intValue(), new byte[0], 67108864, i)) {
                z2 = true;
                break;
            }
            this.isEmptySent.put(Integer.valueOf(i), true);
            this.batchDone.put(Integer.valueOf(i), true);
        }
        return z2;
    }

    protected boolean isAllQueuesEmpty(Queue<Object> queue) {
        return queue.isEmpty();
    }

    protected boolean sendToTarget(boolean z, boolean z2, int i, Queue<Object> queue) {
        Object peek;
        int i2 = 0;
        boolean z3 = z;
        boolean z4 = true;
        while (z4 && (peek = queue.peek()) != null) {
            if (z2 && queue.size() == 1) {
                i2 = 1073741824;
            }
            if (this.dataFlowOperation.sendPartial(this.representSource, peek, i2, i)) {
                queue.poll();
            } else {
                z4 = false;
                z3 = true;
            }
        }
        return z3;
    }
}
