package edu.iu.dsc.tws.comms.dfw;

import edu.iu.dsc.tws.api.comms.CommunicationContext;
import edu.iu.dsc.tws.api.comms.DataFlowOperation;
import edu.iu.dsc.tws.api.comms.LogicalPlan;
import edu.iu.dsc.tws.api.comms.channel.ChannelReceiver;
import edu.iu.dsc.tws.api.comms.channel.TWSChannel;
import edu.iu.dsc.tws.api.comms.messaging.MessageHeader;
import edu.iu.dsc.tws.api.comms.messaging.MessageReceiver;
import edu.iu.dsc.tws.api.comms.messaging.types.MessageType;
import edu.iu.dsc.tws.api.comms.packing.MessageSchema;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.comms.dfw.io.AggregatedObjects;
import edu.iu.dsc.tws.comms.dfw.io.Deserializers;
import edu.iu.dsc.tws.comms.dfw.io.ReceiverState;
import edu.iu.dsc.tws.comms.dfw.io.Serializers;
import edu.iu.dsc.tws.comms.utils.TaskPlanUtils;
import it.unimi.dsi.fastutil.ints.Int2IntArrayMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntArraySet;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.ints.IntListIterator;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/comms/dfw/MToNChain.class */
public class MToNChain implements DataFlowOperation, ChannelReceiver {
    private static final Logger LOG = Logger.getLogger(MToNChain.class.getName());
    private MessageReceiver merger;
    private MessageReceiver finalReceiver;
    private ControlledChannelOperation delegate;
    private LogicalPlan taskPlan;
    private List<Integer> targetWorkers;
    private int thisWorker;
    private MessageType dataType;
    private MessageType keyType;
    private Set<Integer> sources;
    private Set<Integer> targets;
    private int representSource;
    private boolean isKeyed;
    private Set<Integer> thisWorkerSources;
    private MessageType receiveDataType;
    private MessageType receiveKeyType;
    private int sendGroupIndex;
    private int receiveGroupIndex;
    private int competedSends;
    private int competedReceives;
    private int[] thisSourceArray;
    private int[] targetsArray;
    private int mergerInMemoryMessages;
    private int mergedInMemoryMessages;
    private int inMemoryMessageThreshold;
    private Config config;
    private IntArraySet targetsOfThisWorker;
    private int lowWaterMark;
    private int highWaterMark;
    private long groupingSize;
    private MessageSchema messageSchema;
    private TWSChannel channel;
    private Int2ObjectArrayMap<Queue<AggregatedObjects<Object>>> merged = new Int2ObjectArrayMap<>();
    private Lock lock = new ReentrantLock();
    private Lock partialLock = new ReentrantLock();
    private Map<Integer, IntArrayList> workerToTargets = new HashMap();
    private Map<Integer, IntArrayList> workerToSources = new HashMap();
    private Map<Integer, Integer> targetsToWorkers = new HashMap();
    private Map<Integer, Integer> sourcesToWorkers = new HashMap();
    private Map<Integer, RoutingParameters> targetRoutes = new HashMap();
    private Map<Integer, Integer> sendWorkerTaskIndex = new HashMap();
    private Lock swapLock = new ReentrantLock();
    private List<IntArrayList> receiveGroupsWorkers = new ArrayList();
    private List<IntArrayList> sendingGroupsWorkers = new ArrayList();
    private List<IntArrayList> sendingGroupsTargets = new ArrayList();
    private List<IntArrayList> receiveGroupsSources = new ArrayList();
    private IntArraySet syncedSources = new IntArraySet();
    private IntList sendsNeedsToComplete = new IntArrayList();
    private IntList receivesNeedsToComplete = new IntArrayList();
    private Int2IntArrayMap sourcesPerWorker = new Int2IntArrayMap();
    private IntOpenHashSet syncSent = new IntOpenHashSet();
    private Int2ObjectArrayMap<ReceiverState> sourceStates = new Int2ObjectArrayMap<>();
    private IntArraySet mergeFinishSources = new IntArraySet();
    private boolean thisSourcesSynced = false;
    private boolean mergerBlocked = false;
    private IntArraySet finishedSendGroups = new IntArraySet();
    private IntArraySet finishedReceiveGroups = new IntArraySet();
    private boolean startedSyncRound = false;
    private boolean finishedReceiving = false;
    private boolean doneProgress = false;
    private Int2ObjectOpenHashMap<Set<Integer>> finishedSources = new Int2ObjectOpenHashMap<>();
    private IntArrayList finishedTargets = new IntArrayList();
    private boolean receivingFinalSyncs = false;
    private boolean allTargetsReceivedSyncs = false;
    private int roundNumber = 0;
    private boolean lastRound = false;
    private boolean roundCompleted = false;
    private ProgressState progressState = ProgressState.ROUND_DONE;
    private AggregatedObjects<Object> empty = new AggregatedObjects<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:edu/iu/dsc/tws/comms/dfw/MToNChain$ProgressState.class */
    public enum ProgressState {
        MERGED,
        ROUND_DONE,
        SYNC_STARTED
    }

    public MToNChain(Config config, TWSChannel tWSChannel, LogicalPlan logicalPlan, Set<Integer> set, Set<Integer> set2, MessageReceiver messageReceiver, MessageReceiver messageReceiver2, MessageType messageType, MessageType messageType2, MessageType messageType3, MessageType messageType4, int i, MessageSchema messageSchema) {
        this.merger = messageReceiver2;
        this.finalReceiver = messageReceiver;
        this.taskPlan = logicalPlan;
        this.dataType = messageType;
        this.keyType = messageType3;
        this.sources = set;
        this.targets = set2;
        this.receiveDataType = messageType2;
        this.receiveKeyType = messageType4;
        this.config = config;
        this.channel = tWSChannel;
        this.inMemoryMessageThreshold = CommunicationContext.getNetworkPartitionMessageGroupLowWaterMark(config);
        this.lowWaterMark = CommunicationContext.getNetworkPartitionMessageGroupLowWaterMark(config);
        this.highWaterMark = CommunicationContext.getNetworkPartitionMessageGroupHighWaterMark(config);
        this.groupingSize = CommunicationContext.getNetworkPartitionBatchGroupingSize(config);
        this.messageSchema = messageSchema;
        if (this.highWaterMark - this.lowWaterMark <= this.groupingSize) {
            this.groupingSize = (this.highWaterMark - this.lowWaterMark) - 1;
            LOG.fine("Changing the grouping size to: " + this.groupingSize);
        }
        this.thisWorker = logicalPlan.getThisWorker();
        this.targetsOfThisWorker = new IntArraySet(TaskPlanUtils.getTasksOfThisWorker(logicalPlan, set2));
        Set<Integer> tasksOfThisWorker = TaskPlanUtils.getTasksOfThisWorker(logicalPlan, set);
        HashMap hashMap = new HashMap();
        Iterator<Integer> it = set2.iterator();
        while (it.hasNext()) {
            hashMap.put(Integer.valueOf(it.next().intValue()), new ArrayList(tasksOfThisWorker));
        }
        this.merger.init(config, this, hashMap);
        HashMap hashMap2 = new HashMap();
        IntIterator it2 = this.targetsOfThisWorker.iterator();
        while (it2.hasNext()) {
            hashMap2.put(Integer.valueOf(((Integer) it2.next()).intValue()), new ArrayList(set));
        }
        this.finalReceiver.init(config, this, hashMap2);
        calculateWorkerIdToTargets(set2, this.workerToTargets, this.targetsToWorkers);
        calculateWorkerIdToTargets(set, this.workerToSources, this.sourcesToWorkers);
        this.targetWorkers = new ArrayList(this.workerToTargets.keySet());
        Collections.sort(this.targetWorkers);
        ArrayList arrayList = new ArrayList(this.workerToSources.keySet());
        Collections.sort(arrayList);
        Iterator it3 = arrayList.iterator();
        while (it3.hasNext()) {
            int intValue = ((Integer) it3.next()).intValue();
            if (intValue == this.thisWorker) {
                this.sourcesPerWorker.put(intValue, 0);
            } else {
                this.sourcesPerWorker.put(intValue, TaskPlanUtils.getTasksOfWorker(this.taskPlan, intValue, set).size());
            }
        }
        calculateRoutingParameters();
        if (tasksOfThisWorker.size() > 0) {
            this.representSource = tasksOfThisWorker.iterator().next().intValue();
        }
        if (this.keyType != null) {
            this.isKeyed = true;
        }
        this.thisWorkerSources = TaskPlanUtils.getTasksOfThisWorker(this.taskPlan, set);
        int i2 = 0;
        this.thisSourceArray = new int[this.thisWorkerSources.size()];
        Iterator<Integer> it4 = this.thisWorkerSources.iterator();
        while (it4.hasNext()) {
            int intValue2 = it4.next().intValue();
            int i3 = i2;
            i2++;
            this.thisSourceArray[i3] = intValue2;
            this.sourceStates.put(intValue2, ReceiverState.INIT);
        }
        this.targetsArray = new int[set2.size()];
        int i4 = 0;
        Iterator<Integer> it5 = set2.iterator();
        while (it5.hasNext()) {
            int i5 = i4;
            i4++;
            this.targetsArray[i5] = it5.next().intValue();
        }
        Set<Integer> workersOfTasks = TaskPlanUtils.getWorkersOfTasks(logicalPlan, set);
        workersOfTasks.remove(Integer.valueOf(this.taskPlan.getThisWorker()));
        HashMap hashMap3 = new HashMap();
        HashMap hashMap4 = new HashMap();
        HashMap hashMap5 = new HashMap();
        HashMap hashMap6 = new HashMap();
        HashMap hashMap7 = new HashMap();
        Iterator<Integer> it6 = tasksOfThisWorker.iterator();
        while (it6.hasNext()) {
            int intValue3 = it6.next().intValue();
            hashMap3.put(Integer.valueOf(intValue3), new ArrayBlockingQueue(CommunicationContext.sendPendingMax(config)));
            hashMap6.put(Integer.valueOf(intValue3), Serializers.get(this.isKeyed, this.messageSchema));
        }
        int receiveBufferCount = CommunicationContext.receiveBufferCount(config);
        int size = workersOfTasks.size();
        size = size == 0 ? 1 : size;
        Iterator<Integer> it7 = set.iterator();
        while (it7.hasNext()) {
            int intValue4 = it7.next().intValue();
            int i6 = receiveBufferCount * 2 * size;
            hashMap4.put(Integer.valueOf(intValue4), new ArrayBlockingQueue(i6));
            hashMap5.put(Integer.valueOf(intValue4), new ArrayBlockingQueue(i6));
            hashMap7.put(Integer.valueOf(intValue4), Deserializers.get(this.isKeyed, this.messageSchema));
        }
        calculateReceiveGroups();
        Iterator<Integer> it8 = set2.iterator();
        while (it8.hasNext()) {
            int intValue5 = it8.next().intValue();
            this.merged.put(intValue5, new LinkedList());
            this.finishedSources.put(intValue5, new HashSet());
        }
        if (this.thisWorker == 0) {
            LOG.info("Starting ring algorithm");
        }
        this.delegate = new ControlledChannelOperation(tWSChannel, config, this.dataType, messageType2, messageType3, messageType4, logicalPlan, i, workersOfTasks, this, hashMap3, hashMap4, hashMap5, hashMap6, hashMap7, this.isKeyed, this.sendingGroupsTargets, this.receiveGroupsSources, this.receiveGroupsWorkers);
        startNextStep();
    }

    private void startNextStep() {
        Iterator it = this.sendingGroupsWorkers.get(this.sendGroupIndex).iterator();
        while (it.hasNext()) {
            this.sendWorkerTaskIndex.put(Integer.valueOf(((Integer) it.next()).intValue()), 0);
        }
        this.competedSends = 0;
        this.competedReceives = 0;
        this.delegate.startGroup(this.receiveGroupIndex, this.sendGroupIndex, this.sourcesPerWorker);
    }

    private void calculateReceiveGroups() {
        Set<Integer> workersOfTasks = TaskPlanUtils.getWorkersOfTasks(this.taskPlan, this.sources);
        Set<Integer> workersOfTasks2 = TaskPlanUtils.getWorkersOfTasks(this.taskPlan, this.targets);
        ArrayList arrayList = new ArrayList(workersOfTasks);
        ArrayList arrayList2 = new ArrayList(workersOfTasks2);
        arrayList.sort(new Comparator<Integer>() { // from class: edu.iu.dsc.tws.comms.dfw.MToNChain.1
            @Override // java.util.Comparator
            public int compare(Integer num, Integer num2) {
                return num.intValue() - num2.intValue();
            }
        });
        arrayList2.sort(new Comparator<Integer>() { // from class: edu.iu.dsc.tws.comms.dfw.MToNChain.2
            @Override // java.util.Comparator
            public int compare(Integer num, Integer num2) {
                return num.intValue() - num2.intValue();
            }
        });
        int ringWorkersPerGroup = CommunicationContext.getRingWorkersPerGroup(this.config);
        int min = (int) Math.min(Math.ceil(workersOfTasks2.size() / (ringWorkersPerGroup * 1.0d)), Math.ceil(workersOfTasks.size() / (ringWorkersPerGroup * 1.0d)));
        this.sendGroupIndex = createGroup(arrayList2, min, this.sendingGroupsWorkers);
        this.receiveGroupIndex = createGroup(arrayList, min, this.receiveGroupsWorkers);
        for (List list : this.sendingGroupsWorkers) {
            IntArrayList intArrayList = new IntArrayList();
            int i = 0;
            Iterator it = list.iterator();
            while (it.hasNext()) {
                List list2 = this.workerToTargets.get(Integer.valueOf(((Integer) it.next()).intValue()));
                i += list2.size();
                intArrayList.addAll(list2);
            }
            this.sendsNeedsToComplete.add(i);
            this.sendingGroupsTargets.add(intArrayList);
        }
        for (List list3 : this.receiveGroupsWorkers) {
            IntArrayList intArrayList2 = new IntArrayList();
            int i2 = 0;
            Iterator it2 = list3.iterator();
            while (it2.hasNext()) {
                List list4 = this.workerToSources.get(Integer.valueOf(((Integer) it2.next()).intValue()));
                i2 += list4.size();
                intArrayList2.addAll(list4);
            }
            this.receivesNeedsToComplete.add(i2);
            this.receiveGroupsSources.add(intArrayList2);
        }
    }

    private int createGroup(List<Integer> list, int i, List<IntArrayList> list2) {
        int size = list.size() / i;
        IntArrayList intArrayList = new IntArrayList();
        int i2 = 0;
        for (int i3 = 0; i3 < list.size(); i3++) {
            Integer num = list.get(i3);
            intArrayList.add(num);
            if (intArrayList.size() == 1) {
                list2.add(intArrayList);
            }
            if (num.intValue() == this.thisWorker) {
                i2 = list2.size() - 1;
            }
            if (intArrayList.size() == size) {
                intArrayList = new IntArrayList();
            }
        }
        return i2;
    }

    private void calculateWorkerIdToTargets(Set<Integer> set, Map<Integer, IntArrayList> map, Map<Integer, Integer> map2) {
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            int workerForForLogicalId = this.taskPlan.getWorkerForForLogicalId(intValue);
            IntArrayList intArrayList = map.containsKey(Integer.valueOf(workerForForLogicalId)) ? map.get(Integer.valueOf(workerForForLogicalId)) : new IntArrayList();
            intArrayList.add(intValue);
            map.put(Integer.valueOf(workerForForLogicalId), intArrayList);
            map2.put(Integer.valueOf(intValue), Integer.valueOf(workerForForLogicalId));
        }
    }

    private void calculateRoutingParameters() {
        this.targetRoutes = new HashMap();
        Iterator<Integer> it = this.targets.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            RoutingParameters routingParameters = new RoutingParameters();
            if (this.targetsToWorkers.get(Integer.valueOf(intValue)).intValue() != this.thisWorker) {
                routingParameters.addExternalRoute(intValue);
            } else {
                routingParameters.addInteranlRoute(intValue);
            }
            routingParameters.setDestinationId(intValue);
            this.targetRoutes.put(Integer.valueOf(intValue), routingParameters);
        }
    }

    public boolean sendPartial(int i, Object obj, int i2) {
        throw new UnsupportedOperationException("Operation is not supported");
    }

    public boolean send(int i, Object obj, int i2) {
        throw new UnsupportedOperationException("Operation is not supported");
    }

    public boolean send(int i, Object obj, int i2, int i3) {
        if (!this.merger.onMessage(i, 0, i3, i2, obj)) {
            this.mergerBlocked = true;
            return false;
        }
        if ((i2 & 67108864) != 67108864) {
            this.mergerInMemoryMessages++;
        }
        this.mergerBlocked = false;
        return true;
    }

    public boolean sendPartial(int i, Object obj, int i2, int i3) {
        if ((i2 & 67108864) == 67108864) {
            this.syncedSources.add(i);
            if (this.syncedSources.size() != this.thisSourceArray.length) {
                return true;
            }
            this.thisSourcesSynced = true;
            return true;
        }
        if (this.mergedInMemoryMessages >= this.highWaterMark * this.targetsArray.length) {
            return false;
        }
        Queue queue = (Queue) this.merged.get(i3);
        if (!(obj instanceof AggregatedObjects)) {
            throw new RuntimeException("Un-expected message");
        }
        queue.add((AggregatedObjects) obj);
        this.mergedInMemoryMessages += ((AggregatedObjects) obj).size();
        this.mergerInMemoryMessages -= ((AggregatedObjects) obj).size();
        return true;
    }

    private boolean sendSyncs() {
        boolean z = false;
        IntArrayList intArrayList = this.sendingGroupsWorkers.get(this.sendGroupIndex);
        for (int i = 0; i < intArrayList.size(); i++) {
            int i2 = intArrayList.getInt(i);
            IntArrayList intArrayList2 = this.workerToTargets.get(Integer.valueOf(i2));
            int intValue = this.sendWorkerTaskIndex.get(Integer.valueOf(i2)).intValue();
            for (int i3 = intValue; i3 < intArrayList2.size(); i3++) {
                int i4 = intArrayList2.getInt(i3);
                if (!this.delegate.sendMessage(this.representSource, new byte[1], i4, 67108864, this.targetRoutes.get(Integer.valueOf(i4)))) {
                    return false;
                }
                z = true;
                intValue++;
                this.syncSent.add(i4);
                this.sendWorkerTaskIndex.put(Integer.valueOf(i2), Integer.valueOf(intValue));
            }
        }
        if (!z) {
            return true;
        }
        if (this.syncSent.size() == this.targetsArray.length) {
            Iterator<Integer> it = this.thisWorkerSources.iterator();
            while (it.hasNext()) {
                this.sourceStates.put(it.next().intValue(), ReceiverState.SYNCED);
            }
        }
        this.finishedSendGroups.add(this.sendGroupIndex);
        return true;
    }

    private int decrement(int i, int i2) {
        return ((i + i2) - 1) % i2;
    }

    private int increment(int i, int i2) {
        return (i + 1) % i2;
    }

    public void sendCompleted(Object obj) {
        this.competedSends++;
    }

    public boolean progress() {
        boolean z = false;
        boolean z2 = true;
        if (this.doneProgress) {
            return false;
        }
        int intValue = this.sendsNeedsToComplete.get(this.sendGroupIndex).intValue();
        if (this.progressState == ProgressState.ROUND_DONE && (this.mergerInMemoryMessages >= this.highWaterMark * this.targetsArray.length || this.mergerBlocked || this.mergeFinishSources.size() > 0)) {
            this.merger.progress();
            z2 = !this.merger.isComplete();
            this.progressState = ProgressState.MERGED;
        }
        boolean z3 = true;
        if (this.progressState == ProgressState.MERGED) {
            z3 = sendToGroup();
        } else if (this.progressState == ProgressState.SYNC_STARTED) {
            z3 = sendSyncs();
            z2 = false;
        }
        boolean z4 = this.competedSends == intValue;
        boolean z5 = this.receivesNeedsToComplete.getInt(this.receiveGroupIndex) == this.competedReceives;
        for (int i = 0; i < 4 && !z; i++) {
            if (!z4) {
                for (int i2 = 0; i2 < this.thisSourceArray.length; i2++) {
                    this.delegate.sendProgress(this.thisSourceArray[i2]);
                }
                this.lock.lock();
                try {
                    this.channel.progressSends();
                    this.lock.unlock();
                } finally {
                }
            }
            if (!z5) {
                IntArrayList intArrayList = this.receiveGroupsSources.get(this.receiveGroupIndex);
                for (int i3 = 0; i3 < intArrayList.size(); i3++) {
                    int i4 = intArrayList.getInt(i3);
                    this.delegate.receiveDeserializeProgress(i4);
                    this.delegate.receiveProgress(i4);
                }
                this.lock.lock();
                try {
                    this.channel.progressReceives(this.receiveGroupIndex);
                    this.lock.unlock();
                } finally {
                }
            }
            z4 = this.competedSends == intValue;
            z5 = this.receivesNeedsToComplete.getInt(this.receiveGroupIndex) == this.competedReceives;
            if (z3 && z4 && z5) {
                z = true;
            }
        }
        if (!this.receivingFinalSyncs) {
            this.finalReceiver.progress();
        }
        boolean z6 = !this.allTargetsReceivedSyncs || z2;
        if (z && !z6) {
            this.finishedReceiving = true;
        }
        if (z) {
            this.finishedReceiveGroups.add(this.receiveGroupIndex);
            if (this.finishedSendGroups.size() == this.sendingGroupsWorkers.size() && this.finishedReceiveGroups.size() == this.receiveGroupsWorkers.size()) {
                if (!this.thisSourcesSynced || containsAnyDataToSend()) {
                    this.progressState = ProgressState.ROUND_DONE;
                } else {
                    this.startedSyncRound = true;
                    this.progressState = ProgressState.SYNC_STARTED;
                }
                this.roundCompleted = true;
                this.roundNumber++;
                this.finishedReceiveGroups.clear();
                this.finishedSendGroups.clear();
                if (this.startedSyncRound && !this.allTargetsReceivedSyncs) {
                    this.finishedTargets.clear();
                    IntIterator it = this.targetsOfThisWorker.iterator();
                    while (it.hasNext()) {
                        ((Set) this.finishedSources.get(((Integer) it.next()).intValue())).clear();
                    }
                }
            }
            if (this.roundCompleted && !this.lastRound && this.finishedReceiving) {
                this.lastRound = true;
            } else if (this.lastRound && this.roundCompleted) {
                if (this.delegate.isComplete()) {
                    this.doneProgress = true;
                } else {
                    this.delegate.progress();
                    this.lock.lock();
                    try {
                        this.channel.progress();
                        this.lock.unlock();
                    } finally {
                        this.lock.unlock();
                    }
                }
            }
            if (!this.lastRound || !this.roundCompleted) {
                this.sendGroupIndex = decrement(this.sendGroupIndex, this.sendingGroupsWorkers.size());
                this.receiveGroupIndex = increment(this.receiveGroupIndex, this.receiveGroupsWorkers.size());
                if (this.roundCompleted) {
                    LOG.info("Starting round number: " + this.roundNumber);
                }
                this.roundCompleted = false;
                startNextStep();
            }
        }
        boolean z7 = true;
        if (this.doneProgress) {
            this.finalReceiver.progress();
            z7 = !this.finalReceiver.isComplete();
        }
        return z7;
    }

    private boolean containsAnyDataToSend() {
        for (int i = 0; i < this.targetsArray.length; i++) {
            Queue queue = (Queue) this.merged.get(this.targetsArray[i]);
            if (queue != null && queue.size() > 0) {
                return true;
            }
        }
        return false;
    }

    private boolean sendToGroup() {
        boolean z = false;
        IntArrayList intArrayList = this.sendingGroupsWorkers.get(this.sendGroupIndex);
        for (int i = 0; i < intArrayList.size(); i++) {
            int i2 = intArrayList.getInt(i);
            IntArrayList intArrayList2 = this.workerToTargets.get(Integer.valueOf(i2));
            int intValue = this.sendWorkerTaskIndex.get(Integer.valueOf(i2)).intValue();
            for (int i3 = intValue; i3 < intArrayList2.size(); i3++) {
                int intValue2 = intArrayList2.get(i3).intValue();
                Queue queue = (Queue) this.merged.get(intValue2);
                AggregatedObjects<Object> aggregatedObjects = (AggregatedObjects) queue.peek();
                if (aggregatedObjects == null) {
                    aggregatedObjects = this.empty;
                }
                if (!this.delegate.sendMessage(this.representSource, aggregatedObjects, intValue2, 0, this.targetRoutes.get(Integer.valueOf(intValue2)))) {
                    return false;
                }
                queue.poll();
                z = true;
                this.mergedInMemoryMessages -= aggregatedObjects.size();
                intValue++;
                this.sendWorkerTaskIndex.put(Integer.valueOf(i2), Integer.valueOf(intValue));
            }
        }
        if (!z) {
            return true;
        }
        this.finishedSendGroups.add(this.sendGroupIndex);
        return true;
    }

    public void close() {
        if (this.merged != null) {
            this.merger.close();
        }
        if (this.finalReceiver != null) {
            this.finalReceiver.close();
        }
        this.mergeFinishSources.clear();
        this.delegate.close();
    }

    public void reset() {
        if (this.merged != null) {
            this.merger.clean();
        }
        if (this.finalReceiver != null) {
            this.finalReceiver.clean();
        }
        this.mergeFinishSources.clear();
        this.allTargetsReceivedSyncs = false;
        this.receivingFinalSyncs = false;
        this.doneProgress = false;
        this.finishedTargets.clear();
        this.startedSyncRound = false;
        Iterator<Integer> it = this.targets.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            this.merged.put(intValue, new LinkedList());
            this.finishedSources.put(intValue, new HashSet());
        }
        this.finishedReceiveGroups.clear();
        this.finishedSendGroups.clear();
        this.mergerBlocked = false;
        this.thisSourcesSynced = false;
        this.mergeFinishSources.clear();
        this.finishedReceiving = false;
        int i = 0;
        this.syncSent.clear();
        Iterator<Integer> it2 = this.thisWorkerSources.iterator();
        while (it2.hasNext()) {
            int intValue2 = it2.next().intValue();
            int i2 = i;
            i++;
            this.thisSourceArray[i2] = intValue2;
            this.sourceStates.put(intValue2, ReceiverState.INIT);
        }
    }

    public LogicalPlan getLogicalPlan() {
        return this.taskPlan;
    }

    public String getUniqueId() {
        return null;
    }

    public boolean isComplete() {
        if (this.doneProgress) {
            return this.delegate.isComplete();
        }
        return false;
    }

    public MessageType getKeyType() {
        return this.keyType;
    }

    public MessageType getDataType() {
        return this.dataType;
    }

    public Set<Integer> getSources() {
        return this.sources;
    }

    public Set<Integer> getTargets() {
        return this.targets;
    }

    public boolean receiveMessage(MessageHeader messageHeader, Object obj) {
        if (this.finishedReceiving) {
            this.competedReceives++;
            return true;
        }
        int flags = messageHeader.getFlags();
        if ((flags & 67108864) == 67108864) {
            IntListIterator it = this.workerToSources.get(Integer.valueOf(this.sourcesToWorkers.get(Integer.valueOf(messageHeader.getSourceId())).intValue())).iterator();
            while (it.hasNext()) {
                int intValue = ((Integer) it.next()).intValue();
                if (!this.finalReceiver.onMessage(intValue, 0, messageHeader.getDestinationIdentifier(), messageHeader.getFlags(), obj)) {
                    throw new RuntimeException("Sync should be accepted");
                }
                addSync(intValue, messageHeader.getDestinationIdentifier());
            }
            this.competedReceives++;
            return true;
        }
        if ((flags & 33554432) == 33554432) {
            boolean onMessage = this.finalReceiver.onMessage(messageHeader.getSourceId(), 0, messageHeader.getDestinationIdentifier(), messageHeader.getFlags(), obj);
            if (onMessage) {
                this.competedReceives++;
            }
            return onMessage;
        }
        if (!(obj instanceof AggregatedObjects)) {
            if (obj != null) {
                throw new RuntimeException("we can only receive Aggregator objects");
            }
            this.competedReceives++;
            return true;
        }
        if (((AggregatedObjects) obj).size() <= 0) {
            this.competedReceives++;
            return true;
        }
        boolean onMessage2 = this.finalReceiver.onMessage(messageHeader.getSourceId(), 0, messageHeader.getDestinationIdentifier(), messageHeader.getFlags(), obj);
        if (onMessage2) {
            this.competedReceives++;
        }
        return onMessage2;
    }

    public boolean receiveSendInternally(int i, int i2, int i3, int i4, Object obj) {
        if (this.finishedReceiving) {
            this.competedReceives++;
            return true;
        }
        if ((i4 & 67108864) == 67108864) {
            IntListIterator it = this.workerToSources.get(Integer.valueOf(this.sourcesToWorkers.get(Integer.valueOf(i)).intValue())).iterator();
            while (it.hasNext()) {
                int intValue = ((Integer) it.next()).intValue();
                if (!this.finalReceiver.onMessage(intValue, 0, i2, i4, obj)) {
                    throw new RuntimeException("Sync should be accepted");
                }
                addSync(intValue, i2);
            }
            this.competedReceives++;
            return true;
        }
        if (!(obj instanceof AggregatedObjects)) {
            if (obj != null) {
                throw new RuntimeException("we can only receive Aggregator objects");
            }
            this.competedReceives++;
            return true;
        }
        if (((AggregatedObjects) obj).size() <= 0) {
            this.competedReceives++;
            return true;
        }
        boolean onMessage = this.finalReceiver.onMessage(i, 0, i2, i4, obj);
        if (onMessage) {
            this.competedReceives++;
        }
        return onMessage;
    }

    private void addSync(int i, int i2) {
        this.receivingFinalSyncs = true;
        Set set = (Set) this.finishedSources.get(i2);
        if (set.contains(Integer.valueOf(i))) {
            LOG.log(Level.FINE, String.format("%d Duplicate finish from source id %d -> %d", Integer.valueOf(this.thisWorker), Integer.valueOf(i), Integer.valueOf(i2)));
        } else {
            set.add(Integer.valueOf(i));
        }
        if (set.size() == this.sources.size()) {
            if (!this.finishedTargets.contains(i2)) {
                this.finishedTargets.add(i2);
            }
            if (this.finishedTargets.size() == this.targetsOfThisWorker.size()) {
                this.allTargetsReceivedSyncs = true;
            }
        }
    }

    public boolean isDelegateComplete() {
        return this.delegate.isComplete();
    }

    public void finish(int i) {
        this.mergeFinishSources.add(i);
        LOG.info("Finishing source " + i);
        TaskPlanUtils.getTasksOfThisWorker(this.taskPlan, this.targets);
        Iterator<Integer> it = this.targets.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            while (!send(i, new int[0], 67108864, intValue)) {
                progress();
            }
        }
    }

    public MessageType getReceiveKeyType() {
        return this.receiveKeyType;
    }

    public MessageType getReceiveDataType() {
        return this.receiveDataType;
    }
}
