package edu.iu.dsc.tws.comms.dfw;

import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import edu.iu.dsc.tws.api.comms.CommunicationContext;
import edu.iu.dsc.tws.api.comms.DataFlowOperation;
import edu.iu.dsc.tws.api.comms.LogicalPlan;
import edu.iu.dsc.tws.api.comms.channel.ChannelReceiver;
import edu.iu.dsc.tws.api.comms.channel.TWSChannel;
import edu.iu.dsc.tws.api.comms.messaging.MessageHeader;
import edu.iu.dsc.tws.api.comms.messaging.MessageReceiver;
import edu.iu.dsc.tws.api.comms.messaging.types.MessageType;
import edu.iu.dsc.tws.api.comms.packing.MessageSchema;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.comms.dfw.io.Deserializers;
import edu.iu.dsc.tws.comms.dfw.io.Serializers;
import edu.iu.dsc.tws.comms.routing.InvertedBinaryTreeRouter;
import edu.iu.dsc.tws.comms.utils.OperationUtils;
import edu.iu.dsc.tws.comms.utils.TaskPlanUtils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/comms/dfw/MToOneTree.class */
public class MToOneTree implements DataFlowOperation, ChannelReceiver {
    private static final Logger LOG = Logger.getLogger(MToOneTree.class.getName());
    protected Set<Integer> sources;
    private Set<Integer> targets;
    protected int destination;
    private int edgeValue;
    private InvertedBinaryTreeRouter router;
    private MessageReceiver finalReceiver;
    private MessageReceiver partialReceiver;
    private int index;
    private int pathToUse;
    private ChannelDataFlowOperation delegete;
    private MessageSchema messageSchema;
    private LogicalPlan instancePlan;
    private MessageType dataType;
    private MessageType keyType;
    private boolean isKeyed;
    private Table<Integer, Integer, RoutingParameters> routingParamCache;
    private Table<Integer, Integer, RoutingParameters> partialRoutingParamCache;
    private Lock lock;
    private Lock partialLock;

    public MToOneTree(TWSChannel tWSChannel, Set<Integer> set, int i, MessageReceiver messageReceiver, MessageReceiver messageReceiver2, int i2, int i3, MessageSchema messageSchema) {
        this.pathToUse = 0;
        this.isKeyed = false;
        this.routingParamCache = HashBasedTable.create();
        this.partialRoutingParamCache = HashBasedTable.create();
        this.lock = new ReentrantLock();
        this.partialLock = new ReentrantLock();
        this.index = i2;
        this.sources = set;
        this.destination = i;
        this.finalReceiver = messageReceiver;
        this.partialReceiver = messageReceiver2;
        this.pathToUse = i3;
        this.delegete = new ChannelDataFlowOperation(tWSChannel);
        this.messageSchema = messageSchema;
        this.targets = new HashSet();
        this.targets.add(Integer.valueOf(i));
    }

    public MToOneTree(TWSChannel tWSChannel, Set<Integer> set, int i, MessageReceiver messageReceiver, MessageReceiver messageReceiver2, int i2, int i3, boolean z, MessageType messageType, MessageType messageType2, MessageSchema messageSchema) {
        this.pathToUse = 0;
        this.isKeyed = false;
        this.routingParamCache = HashBasedTable.create();
        this.partialRoutingParamCache = HashBasedTable.create();
        this.lock = new ReentrantLock();
        this.partialLock = new ReentrantLock();
        this.index = i2;
        this.sources = set;
        this.destination = i;
        this.finalReceiver = messageReceiver;
        this.partialReceiver = messageReceiver2;
        this.pathToUse = i3;
        this.delegete = new ChannelDataFlowOperation(tWSChannel);
        this.isKeyed = z;
        this.keyType = messageType;
        this.dataType = messageType2;
        this.messageSchema = messageSchema;
        this.targets = new HashSet();
        this.targets.add(Integer.valueOf(i));
    }

    public MToOneTree(TWSChannel tWSChannel, Set<Integer> set, int i, MessageReceiver messageReceiver, MessageReceiver messageReceiver2, MessageSchema messageSchema) {
        this(tWSChannel, set, i, messageReceiver, messageReceiver2, 0, 0, messageSchema);
    }

    public boolean receiveMessage(MessageHeader messageHeader, Object obj) {
        return (this.router.isLastReceiver() || this.partialReceiver == null) ? this.finalReceiver.onMessage(messageHeader.getSourceId(), 0, this.router.mainTaskOfExecutor(this.instancePlan.getThisWorker(), 0), messageHeader.getFlags(), obj) : this.partialReceiver.onMessage(messageHeader.getSourceId(), 0, this.router.mainTaskOfExecutor(this.instancePlan.getThisWorker(), 0), messageHeader.getFlags(), obj);
    }

    private RoutingParameters partialSendRoutingParameters(int i, int i2) {
        if (this.partialRoutingParamCache.contains(Integer.valueOf(i), Integer.valueOf(i2))) {
            return (RoutingParameters) this.partialRoutingParamCache.get(Integer.valueOf(i), Integer.valueOf(i2));
        }
        RoutingParameters routingParameters = new RoutingParameters();
        Map<Integer, Set<Integer>> internalSendTasks = this.router.getInternalSendTasks(i);
        if (internalSendTasks == null) {
            throw new RuntimeException("Un-expected message from source: " + i);
        }
        Set<Integer> set = internalSendTasks.get(Integer.valueOf(i));
        if (set != null) {
            routingParameters.addInternalRoutes(set);
        }
        Map<Integer, Set<Integer>> externalSendTasksForPartial = this.router.getExternalSendTasksForPartial(i);
        if (externalSendTasksForPartial == null) {
            throw new RuntimeException("Un-expected message from source: " + i);
        }
        Set<Integer> set2 = externalSendTasksForPartial.get(Integer.valueOf(i));
        if (set2 != null) {
            routingParameters.addExternalRoutes(set2);
        }
        routingParameters.setDestinationId(this.router.destinationIdentifier(i, i2));
        this.partialRoutingParamCache.put(Integer.valueOf(i), Integer.valueOf(i2), routingParameters);
        return routingParameters;
    }

    private RoutingParameters sendRoutingParameters(int i, int i2) {
        if (this.routingParamCache.contains(Integer.valueOf(i), Integer.valueOf(i2))) {
            return (RoutingParameters) this.routingParamCache.get(Integer.valueOf(i), Integer.valueOf(i2));
        }
        RoutingParameters routingParameters = new RoutingParameters();
        Map<Integer, Set<Integer>> internalSendTasks = this.router.getInternalSendTasks(i);
        if (internalSendTasks == null) {
            throw new RuntimeException("Un-expected message from source: " + i);
        }
        if (this.router.mainTaskOfExecutor(this.instancePlan.getThisWorker(), 0) == i) {
            routingParameters.addInteranlRoute(i);
        }
        Set<Integer> set = internalSendTasks.get(Integer.valueOf(i));
        if (set != null) {
            routingParameters.addInternalRoutes(set);
        }
        routingParameters.setDestinationId(this.router.destinationIdentifier(i, i2));
        this.routingParamCache.put(Integer.valueOf(i), Integer.valueOf(i2), routingParameters);
        return routingParameters;
    }

    public boolean receiveSendInternally(int i, int i2, int i3, int i4, Object obj) {
        return this.router.isLastReceiver() ? this.finalReceiver.onMessage(i, i3, i2, i4, obj) : this.partialReceiver.onMessage(i, i3, i2, i4, obj);
    }

    public boolean send(int i, Object obj, int i2) {
        return this.delegete.sendMessage(i, obj, this.pathToUse, i2, sendRoutingParameters(i, this.pathToUse));
    }

    public boolean send(int i, Object obj, int i2, int i3) {
        return this.delegete.sendMessage(i, obj, i3, i2, sendRoutingParameters(i, this.pathToUse));
    }

    public boolean sendPartial(int i, Object obj, int i2, int i3) {
        return this.delegete.sendMessagePartial(i, obj, i3, i2, partialSendRoutingParameters(i, i3));
    }

    public void init(Config config, MessageType messageType, MessageType messageType2, MessageType messageType3, MessageType messageType4, LogicalPlan logicalPlan, int i) {
        this.instancePlan = logicalPlan;
        this.dataType = messageType;
        this.keyType = messageType3;
        int thisWorker = this.instancePlan.getThisWorker();
        this.edgeValue = i;
        this.router = new InvertedBinaryTreeRouter(config, logicalPlan, this.destination, this.sources, this.index);
        if (this.partialReceiver != null && !this.router.isLastReceiver()) {
            this.partialReceiver.init(config, this, receiveExpectedTaskIds());
        }
        if (this.finalReceiver != null && this.router.isLastReceiver()) {
            this.finalReceiver.init(config, this, receiveExpectedTaskIds());
        }
        LOG.log(Level.FINE, String.format("%d reduce sources %s dest %d send tasks: %s", Integer.valueOf(thisWorker), this.sources, Integer.valueOf(this.destination), this.router.sendQueueIds()));
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        HashMap hashMap4 = new HashMap();
        HashMap hashMap5 = new HashMap();
        Iterator<Integer> it = this.router.sendQueueIds().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            hashMap.put(Integer.valueOf(intValue), new ArrayBlockingQueue(CommunicationContext.sendPendingMax(config)));
            hashMap4.put(Integer.valueOf(intValue), Serializers.get(this.isKeyed, this.messageSchema));
        }
        int receiveBufferCount = CommunicationContext.receiveBufferCount(config);
        int size = receivingExecutors().size();
        if (size == 0) {
            size = 1;
        }
        Iterator<Integer> it2 = this.router.getReceiveSources().iterator();
        while (it2.hasNext()) {
            int intValue2 = it2.next().intValue();
            int i2 = receiveBufferCount * 2 * size;
            hashMap2.put(Integer.valueOf(intValue2), new ArrayBlockingQueue(i2));
            hashMap3.put(Integer.valueOf(intValue2), new ArrayBlockingQueue(i2));
            hashMap5.put(Integer.valueOf(intValue2), Deserializers.get(this.isKeyed, this.messageSchema));
        }
        Iterator<Integer> it3 = TaskPlanUtils.getTasksOfThisWorker(logicalPlan, this.sources).iterator();
        while (it3.hasNext()) {
            int intValue3 = it3.next().intValue();
            sendRoutingParameters(intValue3, this.pathToUse);
            partialSendRoutingParameters(intValue3, this.pathToUse);
        }
        this.delegete.init(config, messageType, messageType2, messageType3, messageType4, logicalPlan, i, this.router.receivingExecutors(), this, hashMap, hashMap2, hashMap3, hashMap4, hashMap5, this.isKeyed);
    }

    public void init(Config config, MessageType messageType, LogicalPlan logicalPlan, int i) {
        init(config, messageType, messageType, this.keyType, this.keyType, logicalPlan, i);
    }

    public boolean sendPartial(int i, Object obj, int i2) {
        return this.delegete.sendMessagePartial(i, obj, this.pathToUse, i2, partialSendRoutingParameters(i, this.pathToUse));
    }

    protected Set<Integer> receivingExecutors() {
        return this.router.receivingExecutors();
    }

    public Map<Integer, List<Integer>> receiveExpectedTaskIds() {
        return OperationUtils.getIntegerListMap(this.router, this.instancePlan, this.destination);
    }

    public boolean isDelegateComplete() {
        return this.delegete.isComplete();
    }

    public boolean isComplete() {
        return this.delegete.isComplete() && OperationUtils.areReceiversComplete(this.lock, this.finalReceiver, this.partialLock, this.partialReceiver);
    }

    public boolean progress() {
        return OperationUtils.progressReceivers(this.delegete, this.lock, this.finalReceiver, this.partialLock, this.partialReceiver);
    }

    public void close() {
        if (this.finalReceiver != null) {
            this.finalReceiver.close();
        }
        if (this.partialReceiver != null) {
            this.partialReceiver.close();
        }
        this.delegete.close();
    }

    public void reset() {
        if (this.partialReceiver != null) {
            this.partialReceiver.clean();
        }
        if (this.finalReceiver != null) {
            this.finalReceiver.clean();
        }
    }

    public void finish(int i) {
        while (!send(i, new byte[0], 67108864)) {
            progress();
        }
    }

    public MessageType getKeyType() {
        return this.keyType;
    }

    public MessageType getDataType() {
        return this.dataType;
    }

    public LogicalPlan getLogicalPlan() {
        return this.instancePlan;
    }

    public String getUniqueId() {
        return String.valueOf(this.edgeValue);
    }

    public Set<Integer> getSources() {
        return this.sources;
    }

    public Set<Integer> getTargets() {
        return this.targets;
    }
}
