package edu.iu.dsc.tws.comms.dfw.io.gather.keyed;

import edu.iu.dsc.tws.api.comms.BulkReceiver;
import edu.iu.dsc.tws.api.comms.CommunicationContext;
import edu.iu.dsc.tws.api.comms.DataFlowOperation;
import edu.iu.dsc.tws.api.comms.structs.Tuple;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.util.KryoSerializer;
import edu.iu.dsc.tws.comms.dfw.io.KeyedReceiver;
import edu.iu.dsc.tws.comms.shuffle.FSKeyedMerger;
import edu.iu.dsc.tws.comms.shuffle.FSKeyedSortedMerger2;
import edu.iu.dsc.tws.comms.shuffle.Shuffle;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;

/* loaded from: input_file:edu/iu/dsc/tws/comms/dfw/io/gather/keyed/DKGatherBatchFinalReceiver.class */
public class DKGatherBatchFinalReceiver extends KeyedReceiver {
    private BulkReceiver bulkReceiver;
    private Map<Integer, Shuffle> sortedMergers = new HashMap();
    private Comparator comparator;
    private KryoSerializer kryoSerializer;
    private String shuffleDirectory;
    private boolean sorted;
    private boolean groupByKey;

    public DKGatherBatchFinalReceiver(BulkReceiver bulkReceiver, boolean z, int i, String str, Comparator comparator, boolean z2) {
        this.bulkReceiver = bulkReceiver;
        this.sorted = z;
        this.groupByKey = z2;
        this.limitPerKey = i;
        this.isFinalBatchReceiver = false;
        this.shuffleDirectory = str;
        this.comparator = comparator;
        this.kryoSerializer = new KryoSerializer();
    }

    @Override // edu.iu.dsc.tws.comms.dfw.io.KeyedReceiver
    public void init(Config config, DataFlowOperation dataFlowOperation, Map<Integer, List<Integer>> map) {
        super.init(config, dataFlowOperation, map);
        long shuffleMaxBytesInMemory = CommunicationContext.getShuffleMaxBytesInMemory(config);
        long shuffleMaxRecordsInMemory = CommunicationContext.getShuffleMaxRecordsInMemory(config);
        long shuffleFileSize = CommunicationContext.getShuffleFileSize(config);
        int parallelIOAllowance = CommunicationContext.getParallelIOAllowance(config);
        for (Integer num : map.keySet()) {
            this.sortedMergers.put(num, this.sorted ? new FSKeyedSortedMerger2(shuffleMaxBytesInMemory, shuffleFileSize, this.shuffleDirectory, getOperationName(num.intValue()), this.dataFlowOperation.getKeyType(), this.dataFlowOperation.getDataType(), this.comparator, num.intValue(), this.groupByKey, parallelIOAllowance) : new FSKeyedMerger(shuffleMaxBytesInMemory, shuffleMaxRecordsInMemory, this.shuffleDirectory, getOperationName(num.intValue()), this.dataFlowOperation.getKeyType(), this.dataFlowOperation.getDataType()));
        }
        this.bulkReceiver.init(config, map.keySet());
    }

    private String getOperationName(int i) {
        return "gather-" + this.dataFlowOperation.getUniqueId() + "-" + i + "-" + UUID.randomUUID().toString();
    }

    @Override // edu.iu.dsc.tws.comms.dfw.io.KeyedReceiver
    protected boolean sendToTarget(boolean z, boolean z2, int i, Queue<Object> queue) {
        Shuffle shuffle = this.sortedMergers.get(Integer.valueOf(i));
        while (!queue.isEmpty()) {
            Tuple tuple = (Tuple) queue.poll();
            byte[] packToByteArray = this.dataFlowOperation.getDataType().getDataPacker().packToByteArray(tuple.getValue());
            shuffle.add(tuple.getKey(), packToByteArray, packToByteArray.length);
        }
        shuffle.run();
        return z;
    }

    @Override // edu.iu.dsc.tws.comms.dfw.io.KeyedReceiver
    protected boolean finishProgress(boolean z, int i) {
        this.batchDone.put(Integer.valueOf(i), true);
        Shuffle shuffle = this.sortedMergers.get(Integer.valueOf(i));
        shuffle.switchToReading();
        this.bulkReceiver.receive(i, shuffle.readIterator());
        return z;
    }

    @Override // edu.iu.dsc.tws.comms.dfw.io.KeyedReceiver
    protected boolean checkIfEmptyIsSent(int i) {
        return true;
    }
}
