package edu.iu.dsc.tws.comms.mpi;

import edu.iu.dsc.tws.api.comms.CommunicationContext;
import edu.iu.dsc.tws.api.comms.channel.ChannelListener;
import edu.iu.dsc.tws.api.comms.channel.TWSChannel;
import edu.iu.dsc.tws.api.comms.messaging.ChannelMessage;
import edu.iu.dsc.tws.api.comms.packing.DataBuffer;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.config.MPIContext;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.common.util.IterativeLinkedList;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import mpi.Intracomm;
import mpi.MPI;
import mpi.MPIException;
import mpi.Request;
import mpi.Status;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;

/* loaded from: input_file:edu/iu/dsc/tws/comms/mpi/TWSMPIChannel.class */
public class TWSMPIChannel implements TWSChannel {
    private static final Logger LOG = Logger.getLogger(TWSMPIChannel.class.getName());
    private final Intracomm comm;
    private ArrayBlockingQueue<MPISendRequests> pendingSends;
    private List<MPIReceiveRequests> registeredReceives;
    private Int2ObjectArrayMap<List<MPIReceiveRequests>> groupedRegisteredReceives;
    private IterativeLinkedList<MPISendRequests> waitForCompletionSends;
    private int workerId;
    private List<Pair<Integer, Integer>> pendingCloseRequests = new ArrayList();
    private int sendCount = 0;
    private int completedSendCount = 0;
    private int receiveCount = 0;
    private int pendingReceiveCount = 0;
    private boolean debug = false;
    private int completedReceives = 0;
    private List<ByteBuffer> buffers = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:edu/iu/dsc/tws/comms/mpi/TWSMPIChannel$MPIReceiveRequests.class */
    public class MPIReceiveRequests {
        IterativeLinkedList<MPIRequest> pendingRequests = new IterativeLinkedList<>();
        int rank;
        int edge;
        ChannelListener callback;
        Queue<DataBuffer> availableBuffers;

        MPIReceiveRequests(int i, int i2, ChannelListener channelListener, Queue<DataBuffer> queue) {
            this.rank = i;
            this.edge = i2;
            this.callback = channelListener;
            this.availableBuffers = queue;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:edu/iu/dsc/tws/comms/mpi/TWSMPIChannel$MPIRequest.class */
    public class MPIRequest {
        Request request;
        DataBuffer buffer;

        MPIRequest(Request request, DataBuffer dataBuffer) {
            this.request = request;
            this.buffer = dataBuffer;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:edu/iu/dsc/tws/comms/mpi/TWSMPIChannel$MPISendRequests.class */
    public class MPISendRequests {
        IterativeLinkedList<MPIRequest> pendingSends = new IterativeLinkedList<>();
        int rank;
        int edge;
        ChannelMessage message;
        ChannelListener callback;

        MPISendRequests(int i, int i2, ChannelMessage channelMessage, ChannelListener channelListener) {
            this.rank = i;
            this.edge = i2;
            this.message = channelMessage;
            this.callback = channelListener;
        }
    }

    public TWSMPIChannel(Config config, IWorkerController iWorkerController) {
        Object runtimeObject = MPIContext.getRuntimeObject("comm");
        if (runtimeObject == null) {
            this.comm = MPI.COMM_WORLD;
        } else {
            this.comm = (Intracomm) runtimeObject;
        }
        this.pendingSends = new ArrayBlockingQueue<>(CommunicationContext.networkChannelPendingSize(config));
        this.registeredReceives = new ArrayList(1024);
        this.groupedRegisteredReceives = new Int2ObjectArrayMap<>();
        this.waitForCompletionSends = new IterativeLinkedList<>();
        this.workerId = iWorkerController.getWorkerInfo().getWorkerID();
    }

    public boolean sendMessage(int i, ChannelMessage channelMessage, ChannelListener channelListener) {
        return this.pendingSends.offer(new MPISendRequests(i, channelMessage.getHeader().getEdge(), channelMessage, channelListener));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v17, types: [java.util.List] */
    public boolean receiveMessage(int i, int i2, int i3, ChannelListener channelListener, Queue<DataBuffer> queue) {
        ArrayList arrayList;
        MPIReceiveRequests mPIReceiveRequests = new MPIReceiveRequests(i2, i3, channelListener, queue);
        this.registeredReceives.add(mPIReceiveRequests);
        if (this.groupedRegisteredReceives.containsKey(i)) {
            arrayList = (List) this.groupedRegisteredReceives.get(i);
        } else {
            arrayList = new ArrayList();
            this.groupedRegisteredReceives.put(i, arrayList);
        }
        arrayList.add(mPIReceiveRequests);
        return true;
    }

    public void close() {
        while (true) {
            if (this.pendingCloseRequests.isEmpty() && this.pendingSends.isEmpty() && this.waitForCompletionSends.isEmpty()) {
                break;
            } else {
                progress();
            }
        }
        for (int i = 0; i < this.registeredReceives.size(); i++) {
            try {
                IterativeLinkedList.ILLIterator it = this.registeredReceives.get(i).pendingRequests.iterator();
                while (it.hasNext()) {
                    ((MPIRequest) it.next()).request.cancel();
                }
            } catch (MPIException e) {
                LOG.log(Level.SEVERE, "Twister2Network failure", e);
                throw new RuntimeException("Twister2Network failure", e);
            }
        }
    }

    public boolean isComplete() {
        return this.pendingCloseRequests.isEmpty() && this.pendingSends.isEmpty() && this.waitForCompletionSends.isEmpty();
    }

    private void postMessage(MPISendRequests mPISendRequests) {
        ChannelMessage channelMessage = mPISendRequests.message;
        for (int i = 0; i < channelMessage.getNormalBuffers().size(); i++) {
            try {
                this.sendCount++;
                DataBuffer dataBuffer = (DataBuffer) channelMessage.getNormalBuffers().get(i);
                mPISendRequests.pendingSends.add(new MPIRequest(this.comm.iSend(dataBuffer.getByteBuffer(), dataBuffer.getSize(), MPI.BYTE, mPISendRequests.rank, channelMessage.getHeader().getEdge()), dataBuffer));
            } catch (MPIException e) {
                throw new RuntimeException("Failed to send message to rank: " + mPISendRequests.rank);
            }
        }
    }

    private void postReceive(MPIReceiveRequests mPIReceiveRequests) {
        DataBuffer poll = mPIReceiveRequests.availableBuffers.poll();
        while (true) {
            DataBuffer dataBuffer = poll;
            if (dataBuffer == null) {
                return;
            }
            this.pendingReceiveCount++;
            mPIReceiveRequests.pendingRequests.add(new MPIRequest(postReceive(mPIReceiveRequests.rank, mPIReceiveRequests.edge, dataBuffer), dataBuffer));
            poll = mPIReceiveRequests.availableBuffers.poll();
        }
    }

    private Request postReceive(int i, int i2, DataBuffer dataBuffer) {
        try {
            return this.comm.iRecv(dataBuffer.getByteBuffer(), dataBuffer.getCapacity(), MPI.BYTE, i, i2);
        } catch (MPIException e) {
            throw new RuntimeException("Failed to post the receive", e);
        }
    }

    public void progressSends() {
        while (this.pendingSends.size() > 0) {
            MPISendRequests poll = this.pendingSends.poll();
            if (poll != null) {
                postMessage(poll);
                this.waitForCompletionSends.add(poll);
            }
        }
        IterativeLinkedList.ILLIterator it = this.waitForCompletionSends.iterator();
        while (it.hasNext()) {
            MPISendRequests mPISendRequests = (MPISendRequests) it.next();
            IterativeLinkedList.ILLIterator it2 = mPISendRequests.pendingSends.iterator();
            while (it2.hasNext()) {
                try {
                    if (((MPIRequest) it2.next()).request.testStatus() == null) {
                        break;
                    } else {
                        it2.remove();
                    }
                } catch (MPIException e) {
                    throw new RuntimeException("Failed to complete the send to: " + mPISendRequests.rank, e);
                }
            }
            if (mPISendRequests.pendingSends.size() == 0) {
                mPISendRequests.callback.onSendComplete(mPISendRequests.rank, mPISendRequests.edge, mPISendRequests.message);
                it.remove();
            }
        }
    }

    public void progressReceives(int i) {
        progressInternalReceives((List) this.groupedRegisteredReceives.get(i));
        handlePendingCloseRequests();
    }

    private void progressInternalReceives(List<MPIReceiveRequests> list) {
        for (int i = 0; i < list.size(); i++) {
            MPIReceiveRequests mPIReceiveRequests = list.get(i);
            if (mPIReceiveRequests.availableBuffers.size() > 0) {
                postReceive(mPIReceiveRequests);
            }
            try {
                IterativeLinkedList.ILLIterator it = mPIReceiveRequests.pendingRequests.iterator();
                while (it.hasNext()) {
                    MPIRequest mPIRequest = (MPIRequest) it.next();
                    Status testStatus = mPIRequest.request.testStatus();
                    if (testStatus == null) {
                        break;
                    }
                    if (testStatus.isCancelled()) {
                        throw new RuntimeException("MPI receive request cancelled");
                    }
                    mPIRequest.buffer.setSize(testStatus.getCount(MPI.BYTE));
                    mPIReceiveRequests.callback.onReceiveComplete(mPIReceiveRequests.rank, mPIReceiveRequests.edge, mPIRequest.buffer);
                    this.pendingReceiveCount--;
                    it.remove();
                }
            } catch (MPIException e) {
                LOG.log(Level.SEVERE, "Twister2Network failure", e);
                throw new RuntimeException("Twister2Network failure", e);
            }
        }
    }

    public void progress() {
        progressSends();
        progressInternalReceives(this.registeredReceives);
        handlePendingCloseRequests();
    }

    private void handlePendingCloseRequests() {
        while (this.pendingCloseRequests.size() > 0) {
            Pair<Integer, Integer> remove = this.pendingCloseRequests.remove(0);
            Iterator<MPIReceiveRequests> it = this.registeredReceives.iterator();
            while (it.hasNext()) {
                MPIReceiveRequests next = it.next();
                if (next.edge == ((Integer) remove.getRight()).intValue() && next.rank == ((Integer) remove.getLeft()).intValue()) {
                    IterativeLinkedList.ILLIterator it2 = next.pendingRequests.iterator();
                    while (it2.hasNext()) {
                        try {
                            ((MPIRequest) it2.next()).request.cancel();
                            it2.remove();
                        } catch (MPIException e) {
                            LOG.log(Level.WARNING, String.format("MPI Receive cancel error: rank %d edge %d", remove.getLeft(), remove.getRight()));
                        }
                    }
                    it.remove();
                }
            }
        }
    }

    public ByteBuffer createBuffer(int i) {
        ByteBuffer newByteBuffer = MPI.newByteBuffer(i);
        this.buffers.add(newByteBuffer);
        return newByteBuffer;
    }

    public void releaseBuffers(int i, int i2) {
        this.pendingCloseRequests.add(new ImmutablePair(Integer.valueOf(i), Integer.valueOf(i2)));
    }

    public void reInit(List<JobMasterAPI.WorkerInfo> list) {
    }
}
