package edu.iu.dsc.tws.comms.dfw.io.partition;

import edu.iu.dsc.tws.api.comms.BulkReceiver;
import edu.iu.dsc.tws.api.comms.CommunicationContext;
import edu.iu.dsc.tws.api.comms.DataFlowOperation;
import edu.iu.dsc.tws.api.comms.messaging.MessageReceiver;
import edu.iu.dsc.tws.api.comms.messaging.types.MessageTypes;
import edu.iu.dsc.tws.api.comms.structs.Tuple;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.comms.dfw.io.DFWIOUtils;
import edu.iu.dsc.tws.comms.dfw.io.ReceiverState;
import edu.iu.dsc.tws.comms.shuffle.FSKeyedMerger;
import edu.iu.dsc.tws.comms.shuffle.FSKeyedSortedMerger2;
import edu.iu.dsc.tws.comms.shuffle.FSMerger;
import edu.iu.dsc.tws.comms.shuffle.Shuffle;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/comms/dfw/io/partition/DPartitionBatchFinalReceiver.class */
public class DPartitionBatchFinalReceiver implements MessageReceiver {
    private static final Logger LOG = Logger.getLogger(DPartitionBatchFinalReceiver.class.getName());
    private BulkReceiver bulkReceiver;
    private boolean sorted;
    private Comparator<Object> comparator;
    private DataFlowOperation partition;
    private boolean keyed;
    private List<String> shuffleDirectories;
    private boolean groupByKey;
    private Map<Integer, List<Integer>> expIds;
    private int[] targetsArray;
    private Int2ObjectOpenHashMap<Shuffle> sortedMergers = new Int2ObjectOpenHashMap<>();
    private int thisWorker = 0;
    private Int2ObjectOpenHashMap<Set<Integer>> finishedSources = new Int2ObjectOpenHashMap<>();
    private List<Integer> finishedTargets = new ArrayList();
    private Set<Integer> finishedTargetsCompleted = new HashSet();
    private Set<Integer> targets = new HashSet();
    private int refresh = 0;
    protected Int2ObjectOpenHashMap<ReceiverState> targetStates = new Int2ObjectOpenHashMap<>();
    private Lock lock = new ReentrantLock();
    private boolean complete = false;

    public DPartitionBatchFinalReceiver(BulkReceiver bulkReceiver, boolean z, List<String> list, Comparator<Object> comparator, boolean z2) {
        this.bulkReceiver = bulkReceiver;
        this.sorted = z;
        this.comparator = comparator;
        this.shuffleDirectories = list;
        this.groupByKey = z2;
    }

    public void init(Config config, DataFlowOperation dataFlowOperation, Map<Integer, List<Integer>> map) {
        long shuffleMaxBytesInMemory = CommunicationContext.getShuffleMaxBytesInMemory(config);
        long shuffleMaxRecordsInMemory = CommunicationContext.getShuffleMaxRecordsInMemory(config);
        long shuffleFileSize = CommunicationContext.getShuffleFileSize(config);
        int parallelIOAllowance = CommunicationContext.getParallelIOAllowance(config);
        this.expIds = map;
        this.thisWorker = dataFlowOperation.getLogicalPlan().getThisWorker();
        this.finishedSources = new Int2ObjectOpenHashMap<>();
        this.partition = dataFlowOperation;
        this.keyed = this.partition.getKeyType() != null;
        this.targets = new HashSet(map.keySet());
        initMergers(shuffleMaxBytesInMemory, shuffleMaxRecordsInMemory, shuffleFileSize, parallelIOAllowance);
        this.bulkReceiver.init(config, map.keySet());
        int i = 0;
        this.targetsArray = new int[map.keySet().size()];
        for (Integer num : map.keySet()) {
            this.targetStates.put(num, ReceiverState.INIT);
            int i2 = i;
            i++;
            this.targetsArray[i2] = num.intValue();
        }
    }

    private void initMergers(long j, long j2, long j3, int i) {
        for (Integer num : this.expIds.keySet()) {
            String str = this.shuffleDirectories.get(this.partition.getLogicalPlan().getIndexOfTaskInNode(num.intValue()) % this.shuffleDirectories.size());
            this.sortedMergers.put(num, this.partition.getKeyType() == null ? new FSMerger(j, j2, str, DFWIOUtils.getOperationName(num.intValue(), this.partition, this.refresh), this.partition.getDataType()) : this.sorted ? new FSKeyedSortedMerger2(j, j3, str, DFWIOUtils.getOperationName(num.intValue(), this.partition, this.refresh), this.partition.getKeyType(), this.partition.getDataType(), this.comparator, num.intValue(), this.groupByKey, i) : new FSKeyedMerger(j, j2, str, DFWIOUtils.getOperationName(num.intValue(), this.partition, this.refresh), this.partition.getKeyType(), this.partition.getDataType()));
            this.finishedSources.put(num, new HashSet());
        }
    }

    public boolean onMessage(int i, int i2, int i3, int i4, Object obj) {
        if (!this.lock.tryLock()) {
            return false;
        }
        try {
            Shuffle shuffle = (Shuffle) this.sortedMergers.get(i3);
            if (shuffle == null) {
                throw new RuntimeException("Un-expected target id: " + i3);
            }
            if (this.targetStates.get(i3) == ReceiverState.INIT) {
                this.targetStates.put(i3, ReceiverState.RECEIVING);
            }
            if ((i4 & 67108864) == 67108864) {
                Set set = (Set) this.finishedSources.get(i3);
                if (set.contains(Integer.valueOf(i))) {
                    LOG.log(Level.FINE, String.format("%d Duplicate finish from source id %d -> %d", Integer.valueOf(this.thisWorker), Integer.valueOf(i), Integer.valueOf(i3)));
                } else {
                    set.add(Integer.valueOf(i));
                }
                if (set.size() == this.partition.getSources().size()) {
                    if (!this.finishedTargets.contains(Integer.valueOf(i3))) {
                        this.finishedTargets.add(Integer.valueOf(i3));
                    }
                    this.targetStates.put(i3, ReceiverState.ALL_SYNCS_RECEIVED);
                }
                return true;
            }
            if (this.targetStates.get(i3) == ReceiverState.ALL_SYNCS_RECEIVED || this.targetStates.get(i3) == ReceiverState.SYNCED) {
                this.lock.unlock();
                return false;
            }
            if (this.keyed) {
                for (Tuple tuple : (List) obj) {
                    Object value = tuple.getValue();
                    if (this.partition.getReceiveDataType() != MessageTypes.BYTE_ARRAY || !(value instanceof byte[]) || ((i4 & 134217728) == 134217728 && this.partition.getDataType() == MessageTypes.OBJECT)) {
                        tuple.setValue(this.partition.getDataType().getDataPacker().packToByteArray(value));
                    }
                    shuffle.add(tuple);
                }
            } else {
                for (Object obj2 : (List) obj) {
                    byte[] packToByteArray = this.partition.getReceiveDataType() != MessageTypes.BYTE_ARRAY ? this.partition.getDataType().getDataPacker().packToByteArray(obj2) : (byte[]) obj2;
                    shuffle.add(packToByteArray, packToByteArray.length);
                }
            }
            this.lock.unlock();
            return true;
        } finally {
            this.lock.unlock();
        }
    }

    public boolean progress() {
        if (this.lock.tryLock()) {
            boolean z = false;
            for (int i = 0; i < this.targetsArray.length; i++) {
                try {
                    int i2 = this.targetsArray[i];
                    ((Shuffle) this.sortedMergers.get(i2)).run();
                    if (((ReceiverState) this.targetStates.get(i2)) != ReceiverState.SYNCED) {
                        z = true;
                    }
                } finally {
                    this.lock.unlock();
                }
            }
            if (!z) {
                return z;
            }
            for (int i3 = 0; i3 < this.finishedTargets.size(); i3++) {
                int intValue = this.finishedTargets.get(i3).intValue();
                if (!this.finishedTargetsCompleted.contains(Integer.valueOf(intValue)) && this.partition.isDelegateComplete()) {
                    finishTarget(intValue);
                    this.targetStates.put(intValue, ReceiverState.SYNCED);
                    onSyncEvent(intValue, null);
                    this.finishedTargetsCompleted.add(Integer.valueOf(intValue));
                }
            }
            this.lock.unlock();
        }
        this.complete = this.finishedTargetsCompleted.size() == this.targets.size();
        return !this.complete;
    }

    public boolean isComplete() {
        return this.complete;
    }

    private void finishTarget(int i) {
        Shuffle shuffle = (Shuffle) this.sortedMergers.get(i);
        shuffle.switchToReading();
        this.bulkReceiver.receive(i, shuffle.readIterator());
    }

    private void onSyncEvent(int i, byte[] bArr) {
        this.bulkReceiver.sync(i, bArr);
    }

    public void close() {
        ObjectIterator it = this.sortedMergers.values().iterator();
        while (it.hasNext()) {
            ((Shuffle) it.next()).clean();
        }
    }

    public void clean() {
        ObjectIterator it = this.sortedMergers.values().iterator();
        while (it.hasNext()) {
            ((Shuffle) it.next()).clean();
        }
        this.finishedTargetsCompleted.clear();
        this.finishedTargets.clear();
        this.finishedSources.forEach((num, set) -> {
            set.clear();
        });
        IntIterator it2 = this.targetStates.keySet().iterator();
        while (it2.hasNext()) {
            this.targetStates.put(((Integer) it2.next()).intValue(), ReceiverState.INIT);
        }
        this.complete = false;
        this.refresh++;
    }
}
