package edu.iu.dsc.tws.comms.dfw.io.join;

import edu.iu.dsc.tws.api.comms.BulkReceiver;
import edu.iu.dsc.tws.api.comms.CommunicationContext;
import edu.iu.dsc.tws.api.comms.DataFlowOperation;
import edu.iu.dsc.tws.api.comms.messaging.MessageReceiver;
import edu.iu.dsc.tws.api.comms.structs.Tuple;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.util.KryoSerializer;
import edu.iu.dsc.tws.comms.dfw.io.AggregatedObjects;
import edu.iu.dsc.tws.comms.dfw.io.DFWIOUtils;
import edu.iu.dsc.tws.comms.shuffle.FSKeyedSortedMerger2;
import edu.iu.dsc.tws.comms.shuffle.Shuffle;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;

/* loaded from: input_file:edu/iu/dsc/tws/comms/dfw/io/join/DJoinBatchFinalReceiver.class */
public class DJoinBatchFinalReceiver implements MessageReceiver {
    private static final Logger LOG = Logger.getLogger(DJoinBatchFinalReceiver.class.getName());
    private Comparator<Object> comparator;
    private BulkReceiver receiver;
    protected int executor;
    private DataFlowOperation operationLeft;
    private DataFlowOperation operationRight;
    private int thisWorker;
    private Set<Integer> sources;
    private List<String> shuffleDirectories;
    private Map<Integer, Shuffle> sortedMergers = new HashMap();
    private Map<Integer, List<Object>> targetMessagesLeft = new HashMap();
    private Map<Integer, List<Object>> targetMessagesRight = new HashMap();
    private Lock lock = new ReentrantLock();
    private Map<Integer, Set<Integer>> onFinishedSourcesLeft = new HashMap();
    private Map<Integer, Set<Integer>> onFinishedSourcesRight = new HashMap();
    private Map<Integer, Boolean> targetDone = new HashMap();
    private Set<Integer> targets = new HashSet();
    private int refresh = 0;
    private KryoSerializer kryoSerializer = new KryoSerializer();

    /* loaded from: input_file:edu/iu/dsc/tws/comms/dfw/io/join/DJoinBatchFinalReceiver$JoinIterator.class */
    private class JoinIterator<T extends Pair> implements Iterator<Pair> {
        private Map<Object, List<Object>> messageMap;
        private Queue<Object> keyList = new LinkedList();

        JoinIterator(Map<Object, List<Object>> map) {
            this.messageMap = map;
            this.keyList.addAll(map.keySet());
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return !this.keyList.isEmpty();
        }

        @Override // java.util.Iterator
        /* renamed from: next, reason: merged with bridge method [inline-methods] */
        public Pair next2() {
            Object poll = this.keyList.poll();
            return new ImmutablePair(poll, this.messageMap.remove(poll));
        }
    }

    public DJoinBatchFinalReceiver(BulkReceiver bulkReceiver, List<String> list, Comparator<Object> comparator) {
        this.receiver = bulkReceiver;
        this.comparator = comparator;
        this.shuffleDirectories = list;
    }

    public void init(Config config, DataFlowOperation dataFlowOperation, Map<Integer, List<Integer>> map) {
        long shuffleMaxBytesInMemory = CommunicationContext.getShuffleMaxBytesInMemory(config);
        long shuffleFileSize = CommunicationContext.getShuffleFileSize(config);
        int parallelIOAllowance = CommunicationContext.getParallelIOAllowance(config);
        if (this.operationLeft != null) {
            this.operationRight = dataFlowOperation;
            return;
        }
        this.executor = dataFlowOperation.getLogicalPlan().getThisWorker();
        this.thisWorker = dataFlowOperation.getLogicalPlan().getThisWorker();
        this.operationLeft = dataFlowOperation;
        this.sources = dataFlowOperation.getSources();
        this.targets = new HashSet(map.keySet());
        Iterator<Integer> it = map.keySet().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            this.sortedMergers.put(Integer.valueOf(intValue), new FSKeyedSortedMerger2(shuffleMaxBytesInMemory, shuffleFileSize, this.shuffleDirectories.get(dataFlowOperation.getLogicalPlan().getIndexOfTaskInNode(intValue) % this.shuffleDirectories.size()), DFWIOUtils.getOperationName(intValue, this.operationLeft, this.refresh), this.operationLeft.getKeyType(), this.operationLeft.getDataType(), this.comparator, intValue, true, parallelIOAllowance));
            this.targetDone.put(Integer.valueOf(intValue), false);
            this.targetMessagesLeft.put(Integer.valueOf(intValue), new AggregatedObjects());
            this.targetMessagesRight.put(Integer.valueOf(intValue), new AggregatedObjects());
            this.onFinishedSourcesLeft.put(Integer.valueOf(intValue), new HashSet());
            this.onFinishedSourcesRight.put(Integer.valueOf(intValue), new HashSet());
        }
    }

    public boolean onMessage(int i, int i2, int i3, int i4, Object obj) {
        throw new UnsupportedOperationException("Join operation does not support onMessage withouttag");
    }

    public boolean onMessage(int i, int i2, int i3, int i4, int i5, Object obj) {
        Map<Integer, Set<Integer>> map;
        Shuffle shuffle = this.sortedMergers.get(Integer.valueOf(i3));
        if (shuffle == null) {
            throw new RuntimeException("Un-expected target id: " + i3);
        }
        if (i5 == 0) {
            Map<Integer, List<Object>> map2 = this.targetMessagesLeft;
            map = this.onFinishedSourcesLeft;
        } else {
            Map<Integer, List<Object>> map3 = this.targetMessagesRight;
            map = this.onFinishedSourcesRight;
        }
        Set<Integer> set = map.get(Integer.valueOf(i3));
        if ((i4 & 67108864) == 67108864) {
            if (set.contains(Integer.valueOf(i))) {
                LOG.log(Level.WARNING, String.format("%d Duplicate finish from source id %d", Integer.valueOf(this.thisWorker), Integer.valueOf(i)));
                return true;
            }
            set.add(Integer.valueOf(i));
            return true;
        }
        for (Tuple tuple : (List) obj) {
            byte[] packToByteArray = this.operationLeft.getDataType().getDataPacker().packToByteArray(tuple.getValue());
            shuffle.add(tuple.getKey(), packToByteArray, packToByteArray.length);
        }
        return true;
    }

    public boolean progress() {
        boolean z = false;
        Iterator<Shuffle> it = this.sortedMergers.values().iterator();
        while (it.hasNext()) {
            it.next().run();
        }
        Iterator<Integer> it2 = this.targetDone.keySet().iterator();
        while (it2.hasNext()) {
            int intValue = it2.next().intValue();
            if (!this.targetDone.get(Integer.valueOf(intValue)).booleanValue()) {
                this.lock.lock();
                try {
                    if (checkIfFinished(intValue)) {
                        Shuffle shuffle = this.sortedMergers.get(Integer.valueOf(intValue));
                        shuffle.switchToReading();
                        this.receiver.receive(intValue, shuffle.readIterator());
                        this.targetDone.put(Integer.valueOf(intValue), true);
                    } else {
                        z = true;
                    }
                } finally {
                    this.lock.unlock();
                }
            }
        }
        return z;
    }

    public boolean isComplete() {
        throw new NotImplementedException("Not implemented");
    }

    private boolean checkIfFinished(int i) {
        return this.operationLeft.isDelegateComplete() && this.operationRight.isDelegateComplete() && this.onFinishedSourcesLeft.get(Integer.valueOf(i)).equals(this.sources) && this.onFinishedSourcesRight.get(Integer.valueOf(i)).equals(this.sources);
    }

    public void clean() {
        this.refresh++;
    }
}
