package edu.iu.dsc.tws.comms.dfw.io.join;

import edu.iu.dsc.tws.api.comms.BulkReceiver;
import edu.iu.dsc.tws.api.comms.DataFlowOperation;
import edu.iu.dsc.tws.api.comms.messaging.MessageReceiver;
import edu.iu.dsc.tws.api.comms.structs.Tuple;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.comms.dfw.MToNSimple;
import edu.iu.dsc.tws.comms.dfw.io.AggregatedObjects;
import edu.iu.dsc.tws.comms.utils.JoinUtils;
import edu.iu.dsc.tws.comms.utils.KeyComparatorWrapper;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;

/* loaded from: input_file:edu/iu/dsc/tws/comms/dfw/io/join/JoinBatchFinalReceiver.class */
public class JoinBatchFinalReceiver implements MessageReceiver {
    private static final Logger LOG = Logger.getLogger(JoinBatchFinalReceiver.class.getName());
    private BulkReceiver receiver;
    protected int executor;
    private DataFlowOperation operationLeft;
    private DataFlowOperation operationRight;
    private int thisWorker;
    private Set<Integer> sources;
    private KeyComparatorWrapper comparator;
    private Map<Integer, List<Tuple>> targetMessagesLeft = new HashMap();
    private Map<Integer, List<Tuple>> targetMessagesRight = new HashMap();
    private Lock lock = new ReentrantLock();
    private Map<Integer, Set<Integer>> onFinishedSourcesLeft = new HashMap();
    private Map<Integer, Set<Integer>> onFinishedSourcesRight = new HashMap();
    private Map<Integer, Boolean> targetDone = new HashMap();

    /* loaded from: input_file:edu/iu/dsc/tws/comms/dfw/io/join/JoinBatchFinalReceiver$JoinIterator.class */
    private class JoinIterator<T extends Pair> implements Iterator<Pair> {
        private Map<Object, List<Object>> messageMap;
        private Queue<Object> keyList = new LinkedList();

        JoinIterator(Map<Object, List<Object>> map) {
            this.messageMap = map;
            this.keyList.addAll(map.keySet());
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return !this.keyList.isEmpty();
        }

        @Override // java.util.Iterator
        /* renamed from: next, reason: merged with bridge method [inline-methods] */
        public Pair next2() {
            Object poll = this.keyList.poll();
            return new ImmutablePair(poll, this.messageMap.remove(poll));
        }
    }

    public JoinBatchFinalReceiver(BulkReceiver bulkReceiver, Comparator<Object> comparator) {
        this.receiver = bulkReceiver;
        this.comparator = new KeyComparatorWrapper(comparator);
    }

    public void init(Config config, DataFlowOperation dataFlowOperation, Map<Integer, List<Integer>> map) {
        if (this.operationLeft != null) {
            this.operationRight = dataFlowOperation;
            return;
        }
        this.executor = dataFlowOperation.getLogicalPlan().getThisWorker();
        this.thisWorker = dataFlowOperation.getLogicalPlan().getThisWorker();
        this.operationLeft = dataFlowOperation;
        this.sources = ((MToNSimple) dataFlowOperation).getSources();
        Iterator<Integer> it = map.keySet().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            this.targetDone.put(Integer.valueOf(intValue), false);
            this.targetMessagesLeft.put(Integer.valueOf(intValue), new AggregatedObjects());
            this.targetMessagesRight.put(Integer.valueOf(intValue), new AggregatedObjects());
            this.onFinishedSourcesLeft.put(Integer.valueOf(intValue), new HashSet());
            this.onFinishedSourcesRight.put(Integer.valueOf(intValue), new HashSet());
        }
    }

    public boolean onMessage(int i, int i2, int i3, int i4, Object obj) {
        throw new UnsupportedOperationException("Join operation does not support onMessage withouttag");
    }

    public boolean onMessage(int i, int i2, int i3, int i4, int i5, Object obj) {
        Map<Integer, List<Tuple>> map;
        Map<Integer, Set<Integer>> map2;
        if (i5 != 0 && i5 != 1) {
            throw new RuntimeException("Tag value must be either 0(left) or 1(right) for join operation");
        }
        this.lock.lock();
        if (i5 == 0) {
            map = this.targetMessagesLeft;
            map2 = this.onFinishedSourcesLeft;
        } else {
            map = this.targetMessagesRight;
            map2 = this.onFinishedSourcesRight;
        }
        try {
            Set<Integer> set = map2.get(Integer.valueOf(i3));
            if ((i4 & 67108864) == 67108864) {
                if (set.contains(Integer.valueOf(i))) {
                    LOG.log(Level.WARNING, String.format("%d Duplicate finish from source id %d", Integer.valueOf(this.thisWorker), Integer.valueOf(i)));
                } else {
                    set.add(Integer.valueOf(i));
                }
                return true;
            }
            List<Tuple> list = map.get(Integer.valueOf(i3));
            if (list == null) {
                throw new RuntimeException(String.format("%d target not exisits %d", Integer.valueOf(this.executor), Integer.valueOf(i3)));
            }
            if (obj instanceof List) {
                list.addAll((Collection) obj);
            } else {
                list.add((Tuple) obj);
            }
            this.lock.unlock();
            return true;
        } finally {
            this.lock.unlock();
        }
    }

    public boolean progress() {
        boolean z = false;
        Iterator<Integer> it = this.targetDone.keySet().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            if (!this.targetDone.get(Integer.valueOf(intValue)).booleanValue()) {
                this.lock.lock();
                try {
                    if (checkIfFinished(intValue)) {
                        this.receiver.receive(intValue, JoinUtils.innerJoin(this.targetMessagesLeft.get(Integer.valueOf(intValue)), this.targetMessagesRight.get(Integer.valueOf(intValue)), this.comparator).iterator());
                        this.targetDone.put(Integer.valueOf(intValue), true);
                        this.receiver.sync(intValue, (byte[]) null);
                    } else {
                        z = true;
                    }
                } finally {
                    this.lock.unlock();
                }
            }
        }
        return z;
    }

    public boolean isComplete() {
        throw new NotImplementedException("Not implemented");
    }

    private boolean checkIfFinished(int i) {
        return this.operationLeft.isDelegateComplete() && this.operationRight.isDelegateComplete() && this.onFinishedSourcesLeft.get(Integer.valueOf(i)).equals(this.sources) && this.onFinishedSourcesRight.get(Integer.valueOf(i)).equals(this.sources);
    }
}
