package edu.iu.dsc.tws.comms.dfw;

import edu.iu.dsc.tws.api.comms.CommunicationContext;
import edu.iu.dsc.tws.api.comms.DataFlowOperation;
import edu.iu.dsc.tws.api.comms.LogicalPlan;
import edu.iu.dsc.tws.api.comms.channel.ChannelReceiver;
import edu.iu.dsc.tws.api.comms.channel.TWSChannel;
import edu.iu.dsc.tws.api.comms.messaging.MessageHeader;
import edu.iu.dsc.tws.api.comms.messaging.MessageReceiver;
import edu.iu.dsc.tws.api.comms.messaging.types.MessageType;
import edu.iu.dsc.tws.api.comms.packing.MessageSchema;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.comms.dfw.io.Deserializers;
import edu.iu.dsc.tws.comms.dfw.io.Serializers;
import edu.iu.dsc.tws.comms.routing.DirectRouter;
import edu.iu.dsc.tws.comms.utils.TaskPlanUtils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/comms/dfw/OneToOne.class */
public class OneToOne implements DataFlowOperation, ChannelReceiver {
    private static final Logger LOG = Logger.getLogger(OneToOne.class.getName());
    private List<Integer> sources;
    private List<Integer> targets;
    private int edgeValue;
    private MessageReceiver finalReceiver;
    private ChannelDataFlowOperation delegate;
    private MessageSchema messageSchema;
    private Lock lock;
    private LogicalPlan logicalPlan;
    private DirectRouter router;
    private Map<Integer, RoutingParameters> routes;
    private Config config;
    private MessageType sendType;
    private MessageType recvType;
    private Set<Integer> sourceSet;
    private Set<Integer> targetSet;
    private Set<Integer> pendingFinishSources;
    private Set<Integer> finishedSources;
    private Set<Integer> thisSources;
    private Map<Integer, Integer> sourcesToDestinations;

    public OneToOne(TWSChannel tWSChannel, List<Integer> list, List<Integer> list2, MessageReceiver messageReceiver, Config config, MessageType messageType, MessageType messageType2, LogicalPlan logicalPlan, int i, MessageSchema messageSchema) {
        this.routes = new HashMap();
        this.sourcesToDestinations = new HashMap();
        this.sources = list;
        this.targets = list2;
        this.finalReceiver = messageReceiver;
        this.delegate = new ChannelDataFlowOperation(tWSChannel);
        this.messageSchema = messageSchema;
        this.lock = new ReentrantLock();
        this.edgeValue = i;
        this.logicalPlan = logicalPlan;
        this.router = new DirectRouter(logicalPlan, this.sources, this.targets);
        this.config = config;
        this.sendType = messageType;
        this.recvType = messageType2;
        this.sourceSet = new HashSet(this.sources);
        this.targetSet = new HashSet(list2);
        this.pendingFinishSources = new HashSet();
        this.finishedSources = new HashSet();
        for (int i2 = 0; i2 < list.size(); i2++) {
            this.sourcesToDestinations.put(this.sources.get(i2), list2.get(i2));
        }
        init();
    }

    public OneToOne(TWSChannel tWSChannel, List<Integer> list, List<Integer> list2, MessageReceiver messageReceiver, Config config, MessageType messageType, LogicalPlan logicalPlan, int i, MessageSchema messageSchema) {
        this(tWSChannel, list, list2, messageReceiver, config, messageType, messageType, logicalPlan, i, messageSchema);
    }

    public boolean receiveMessage(MessageHeader messageHeader, Object obj) {
        return this.finalReceiver.onMessage(messageHeader.getSourceId(), 0, messageHeader.getDestinationIdentifier(), messageHeader.getFlags(), obj);
    }

    public boolean receiveSendInternally(int i, int i2, int i3, int i4, Object obj) {
        return this.finalReceiver.onMessage(i, i3, i2, i4, obj);
    }

    protected Map<Integer, List<Integer>> receiveExpectedTaskIds() {
        return this.router.receiveExpectedTaskIds();
    }

    private void init() {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        HashMap hashMap4 = new HashMap();
        HashMap hashMap5 = new HashMap();
        this.thisSources = TaskPlanUtils.getTasksOfThisWorker(this.logicalPlan, this.sourceSet);
        Set<Integer> tasksOfThisWorker = TaskPlanUtils.getTasksOfThisWorker(this.logicalPlan, new HashSet(this.targets));
        Iterator<Integer> it = this.thisSources.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            hashMap.put(Integer.valueOf(intValue), new ArrayBlockingQueue(CommunicationContext.sendPendingMax(this.config)));
            hashMap4.put(Integer.valueOf(intValue), Serializers.get(false, this.messageSchema));
        }
        Iterator<Integer> it2 = tasksOfThisWorker.iterator();
        while (it2.hasNext()) {
            int intValue2 = it2.next().intValue();
            hashMap3.put(this.sources.get(this.targets.indexOf(Integer.valueOf(intValue2))), new ArrayBlockingQueue(CommunicationContext.sendPendingMax(this.config)));
            hashMap2.put(this.sources.get(this.targets.indexOf(Integer.valueOf(intValue2))), new ArrayBlockingQueue(CommunicationContext.sendPendingMax(this.config)));
            hashMap5.put(Integer.valueOf(intValue2), Deserializers.get(false, this.messageSchema));
        }
        calculateRoutingParameters();
        this.finalReceiver.init(this.config, this, receiveExpectedTaskIds());
        this.delegate.init(this.config, this.sendType, this.recvType, this.logicalPlan, this.edgeValue, this.router.receivingExecutors(), this, hashMap, hashMap2, hashMap3, hashMap4, hashMap5, false);
    }

    public boolean sendPartial(int i, Object obj, int i2) {
        throw new RuntimeException("This method is not used by direct communication");
    }

    public boolean send(int i, Object obj, int i2) {
        return this.delegate.sendMessage(i, obj, 0, i2, this.routes.get(Integer.valueOf(i)));
    }

    public boolean send(int i, Object obj, int i2, int i3) {
        return this.delegate.sendMessage(i, obj, i3, i2, this.routes.get(Integer.valueOf(i)));
    }

    public boolean sendPartial(int i, Object obj, int i2, int i3) {
        return false;
    }

    public boolean progress() {
        boolean z = false;
        boolean z2 = true;
        try {
            if (this.lock.tryLock()) {
                try {
                    z2 = handleFinish();
                    this.lock.unlock();
                } finally {
                }
            }
            this.delegate.progress();
            if (this.lock.tryLock()) {
                try {
                    z = this.finalReceiver.progress();
                    this.lock.unlock();
                } finally {
                }
            }
            return z || z2;
        } catch (Throwable th) {
            LOG.log(Level.SEVERE, "un-expected error", th);
            throw new RuntimeException(th);
        }
    }

    private boolean handleFinish() {
        Iterator<Integer> it = this.pendingFinishSources.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            if (!this.finishedSources.contains(Integer.valueOf(intValue))) {
                if (!send(intValue, new byte[1], 67108864, this.sourcesToDestinations.get(Integer.valueOf(intValue)).intValue())) {
                    return true;
                }
                this.finishedSources.add(Integer.valueOf(intValue));
            }
        }
        return false;
    }

    public boolean isDelegateComplete() {
        return this.delegate.isComplete();
    }

    public boolean isComplete() {
        if (!this.lock.tryLock()) {
            return true;
        }
        try {
            return this.delegate.isComplete() && this.finalReceiver.isComplete() && !handleFinish();
        } finally {
            this.lock.unlock();
        }
    }

    public void close() {
        if (this.finalReceiver != null) {
            this.finalReceiver.close();
        }
        this.delegate.close();
    }

    public void reset() {
        if (this.finalReceiver != null) {
            this.finalReceiver.clean();
            this.finishedSources.clear();
            this.pendingFinishSources.clear();
        }
    }

    public void finish(int i) {
        if (!this.thisSources.contains(Integer.valueOf(i))) {
            throw new RuntimeException("Invalid source completion: " + i);
        }
        this.lock.lock();
        try {
            this.pendingFinishSources.add(Integer.valueOf(i));
        } finally {
            this.lock.unlock();
        }
    }

    public LogicalPlan getLogicalPlan() {
        return this.logicalPlan;
    }

    public String getUniqueId() {
        return String.valueOf(this.edgeValue);
    }

    private void calculateRoutingParameters() {
        Set logicalIdsOfThisWorker = this.logicalPlan.getLogicalIdsOfThisWorker();
        for (int i = 0; i < this.sources.size(); i++) {
            int intValue = this.sources.get(i).intValue();
            int intValue2 = this.targets.get(i).intValue();
            if (logicalIdsOfThisWorker != null && logicalIdsOfThisWorker.contains(Integer.valueOf(intValue))) {
                this.routes.put(Integer.valueOf(intValue), sendRoutingParameters(intValue, intValue2));
            }
        }
    }

    private RoutingParameters sendRoutingParameters(int i, int i2) {
        RoutingParameters routingParameters = new RoutingParameters();
        Map<Integer, Set<Integer>> internalSendTasks = this.router.getInternalSendTasks(i);
        if (internalSendTasks == null) {
            throw new RuntimeException("Un-expected message from source: " + i);
        }
        Set<Integer> set = internalSendTasks.get(Integer.valueOf(i));
        if (set != null) {
            routingParameters.addInternalRoutes(set);
        }
        Map<Integer, Set<Integer>> externalSendTasks = this.router.getExternalSendTasks(i);
        if (externalSendTasks == null) {
            throw new RuntimeException("Un-expected message from source: " + i);
        }
        Set<Integer> set2 = externalSendTasks.get(Integer.valueOf(i));
        if (set2 != null) {
            routingParameters.addExternalRoutes(set2);
        }
        routingParameters.setDestinationId(i2);
        return routingParameters;
    }

    public MessageType getDataType() {
        return this.sendType;
    }

    public Set<Integer> getSources() {
        return this.sourceSet;
    }

    public Set<Integer> getTargets() {
        return this.targetSet;
    }
}
