package edu.iu.dsc.tws.comms.dfw.io.gather;

import edu.iu.dsc.tws.api.comms.BulkReceiver;
import edu.iu.dsc.tws.api.comms.CommunicationContext;
import edu.iu.dsc.tws.api.comms.DataFlowOperation;
import edu.iu.dsc.tws.api.comms.messaging.MessageReceiver;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.util.KryoSerializer;
import edu.iu.dsc.tws.comms.dfw.io.AggregatedObjects;
import edu.iu.dsc.tws.comms.shuffle.FSMerger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/comms/dfw/io/gather/DGatherBatchFinalReceiver.class */
public class DGatherBatchFinalReceiver implements MessageReceiver {
    private static final Logger LOG = Logger.getLogger(DGatherBatchFinalReceiver.class.getName());
    private BulkReceiver bulkReceiver;
    private String shuffleDirectory;
    private DataFlowOperation gather;
    private Map<Integer, Map<Integer, Queue<Object>>> messages = new HashMap();
    private Map<Integer, Map<Integer, Boolean>> finished = new HashMap();
    private Map<Integer, List<Object>> finalMessages = new HashMap();
    private int sendPendingMax = 128;
    private Map<Integer, Boolean> batchDone = new HashMap();
    private Map<Integer, FSMerger> sortedMergers = new HashMap();
    private boolean complete = false;
    private KryoSerializer kryoSerializer = new KryoSerializer();

    public DGatherBatchFinalReceiver(BulkReceiver bulkReceiver, String str) {
        this.bulkReceiver = bulkReceiver;
        this.shuffleDirectory = str;
    }

    public void init(Config config, DataFlowOperation dataFlowOperation, Map<Integer, List<Integer>> map) {
        long shuffleMaxBytesInMemory = CommunicationContext.getShuffleMaxBytesInMemory(config);
        long shuffleMaxRecordsInMemory = CommunicationContext.getShuffleMaxRecordsInMemory(config);
        this.gather = dataFlowOperation;
        this.sendPendingMax = CommunicationContext.sendPendingMax(config);
        for (Map.Entry<Integer, List<Integer>> entry : map.entrySet()) {
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            Iterator<Integer> it = entry.getValue().iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                hashMap.put(Integer.valueOf(intValue), new ArrayBlockingQueue(this.sendPendingMax));
                hashMap2.put(Integer.valueOf(intValue), false);
            }
            this.messages.put(entry.getKey(), hashMap);
            this.finished.put(entry.getKey(), hashMap2);
            this.finalMessages.put(entry.getKey(), new ArrayList());
            this.batchDone.put(entry.getKey(), false);
            this.sortedMergers.put(entry.getKey(), new FSMerger(shuffleMaxBytesInMemory, shuffleMaxRecordsInMemory, this.shuffleDirectory, getOperationName(entry.getKey().intValue()), this.gather.getDataType()));
        }
        this.bulkReceiver.init(config, map.keySet());
    }

    public boolean onMessage(int i, int i2, int i3, int i4, Object obj) {
        boolean z = true;
        Queue<Object> queue = this.messages.get(Integer.valueOf(i3)).get(Integer.valueOf(i));
        Map<Integer, Boolean> map = this.finished.get(Integer.valueOf(i3));
        if ((i4 & 67108864) == 67108864) {
            map.put(Integer.valueOf(i), true);
            return true;
        }
        if (queue.size() >= this.sendPendingMax) {
            z = false;
        } else {
            queue.add(obj);
            if ((i4 & 1073741824) == 1073741824) {
                map.put(Integer.valueOf(i), true);
            }
        }
        return z;
    }

    public boolean progress() {
        boolean z = false;
        Iterator<Integer> it = this.messages.keySet().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            FSMerger fSMerger = this.sortedMergers.get(Integer.valueOf(intValue));
            if (!this.batchDone.get(Integer.valueOf(intValue)).booleanValue()) {
                boolean z2 = true;
                Map<Integer, Queue<Object>> map = this.messages.get(Integer.valueOf(intValue));
                Map<Integer, Boolean> map2 = this.finished.get(Integer.valueOf(intValue));
                boolean z3 = true;
                boolean z4 = false;
                for (Map.Entry<Integer, Queue<Object>> entry : map.entrySet()) {
                    if (entry.getValue().size() != 0 || map2.get(entry.getKey()).booleanValue()) {
                        z4 = true;
                    } else {
                        z3 = false;
                    }
                    if (!map2.get(entry.getKey()).booleanValue()) {
                        z2 = false;
                    }
                }
                if (!z3 && z4) {
                    z = true;
                }
                if (z3) {
                    AggregatedObjects aggregatedObjects = new AggregatedObjects();
                    Iterator<Map.Entry<Integer, Queue<Object>>> it2 = map.entrySet().iterator();
                    while (it2.hasNext()) {
                        Queue<Object> value = it2.next().getValue();
                        if (value.size() > 0) {
                            Object poll = value.poll();
                            if (poll instanceof List) {
                                aggregatedObjects.addAll((List) poll);
                            } else {
                                aggregatedObjects.add(poll);
                            }
                            z2 = false;
                        }
                    }
                    Iterator<T> it3 = aggregatedObjects.iterator();
                    while (it3.hasNext()) {
                        byte[] packToByteArray = this.gather.getDataType().getDataPacker().packToByteArray(it3.next());
                        fSMerger.add(packToByteArray, packToByteArray.length);
                    }
                } else {
                    z2 = false;
                }
                if (z2) {
                    this.batchDone.put(Integer.valueOf(intValue), true);
                    fSMerger.switchToReading();
                    this.bulkReceiver.receive(intValue, fSMerger.readIterator());
                }
            }
        }
        if (!z) {
            this.complete = true;
        }
        return z;
    }

    public boolean isComplete() {
        return this.complete;
    }

    private String getOperationName(int i) {
        return "gather-" + this.gather.getUniqueId() + "-" + i + "-" + UUID.randomUUID().toString();
    }

    public void close() {
        Iterator<FSMerger> it = this.sortedMergers.values().iterator();
        while (it.hasNext()) {
            it.next().clean();
        }
        this.complete = false;
    }

    public void clean() {
        this.complete = false;
    }

    private void onSyncEvent(int i, byte[] bArr) {
        this.bulkReceiver.sync(i, bArr);
    }
}
