package edu.iu.dsc.tws.comms.dfw;

import edu.iu.dsc.tws.api.comms.CommunicationContext;
import edu.iu.dsc.tws.api.comms.LogicalPlan;
import edu.iu.dsc.tws.api.comms.channel.ChannelListener;
import edu.iu.dsc.tws.api.comms.channel.ChannelReceiver;
import edu.iu.dsc.tws.api.comms.channel.TWSChannel;
import edu.iu.dsc.tws.api.comms.messaging.ChannelMessage;
import edu.iu.dsc.tws.api.comms.messaging.ChannelMessageReleaseCallback;
import edu.iu.dsc.tws.api.comms.messaging.MessageDirection;
import edu.iu.dsc.tws.api.comms.messaging.MessageHeader;
import edu.iu.dsc.tws.api.comms.messaging.types.EmptyType;
import edu.iu.dsc.tws.api.comms.messaging.types.MessageType;
import edu.iu.dsc.tws.api.comms.messaging.types.MessageTypes;
import edu.iu.dsc.tws.api.comms.messaging.types.PrimitiveMessageTypes;
import edu.iu.dsc.tws.api.comms.packing.DataBuffer;
import edu.iu.dsc.tws.api.comms.packing.MessageDeSerializer;
import edu.iu.dsc.tws.api.comms.packing.MessageSerializer;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.comms.dfw.InMessage;
import edu.iu.dsc.tws.comms.dfw.OutMessage;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntListIterator;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/comms/dfw/ControlledChannelOperation.class */
public class ControlledChannelOperation implements ChannelListener, ChannelMessageReleaseCallback {
    private static final Logger LOG = Logger.getLogger(ControlledChannelOperation.class.getName());
    private static final int DEFAULT_PATH = -1;
    private Config config;
    private LogicalPlan instancePlan;
    private int edge;
    private TWSChannel channel;
    private Map<Integer, MessageDeSerializer> messageDeSerializer;
    private Map<Integer, MessageSerializer> messageSerializer;
    private MessageType dataType;
    private MessageType keyType;
    private MessageType receiveDataType;
    private MessageType receiveKeyType;
    private boolean isKeyed;
    private int executor;
    private Queue<DataBuffer> sendBuffers;
    private Map<Integer, Queue<DataBuffer>> receiveBuffers;
    private Map<Integer, ArrayBlockingQueue<OutMessage>> pendingSendMessagesPerSource;
    private Map<Integer, Queue<InMessage>> pendingReceiveMessagesPerSource;
    private Map<Integer, Queue<InMessage>> pendingReceiveDeSerializations;
    private Set<Integer> receivingExecutors;
    private ChannelReceiver receiver;
    private ProgressionTracker sendProgressTracker;
    private ControlledProgressTracker receiveProgressTracker;
    private Queue<DataBuffer> freeReceiveBuffers;
    private List<IntArrayList> receiveIdGroups;
    private List<IntArrayList> sendingGroupsTargets;
    private List<IntArrayList> receiveGroupsSources;
    private Map<Integer, Integer> expectedReceivePerWorker;
    private List<IntArrayList> receiveGroupsWorkers;
    private Lock lock = new ReentrantLock();
    private Map<Integer, InMessage> currentMessages = new HashMap();
    private AtomicInteger externalSendsPending = new AtomicInteger(0);
    private Map<Integer, Integer> currentReceives = new HashMap();
    private int addedFreedBuffers = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ControlledChannelOperation(TWSChannel tWSChannel, Config config, MessageType messageType, MessageType messageType2, MessageType messageType3, MessageType messageType4, LogicalPlan logicalPlan, int i, Set<Integer> set, ChannelReceiver channelReceiver, Map<Integer, ArrayBlockingQueue<OutMessage>> map, Map<Integer, Queue<InMessage>> map2, Map<Integer, Queue<InMessage>> map3, Map<Integer, MessageSerializer> map4, Map<Integer, MessageDeSerializer> map5, boolean z, List<IntArrayList> list, List<IntArrayList> list2, List<IntArrayList> list3) {
        this.keyType = MessageTypes.BYTE;
        this.isKeyed = false;
        this.channel = tWSChannel;
        this.config = config;
        this.instancePlan = logicalPlan;
        this.edge = i;
        this.dataType = messageType;
        this.receiveDataType = messageType2;
        this.receiveKeyType = messageType4;
        this.keyType = messageType3;
        this.executor = this.instancePlan.getThisWorker();
        this.receivingExecutors = set;
        this.receiver = channelReceiver;
        this.isKeyed = z;
        this.pendingReceiveMessagesPerSource = map2;
        this.pendingSendMessagesPerSource = map;
        this.pendingReceiveDeSerializations = map3;
        this.messageSerializer = map4;
        this.messageDeSerializer = map5;
        this.sendingGroupsTargets = list;
        this.receiveGroupsSources = list2;
        this.receiveGroupsWorkers = list3;
        int sendBuffersCount = CommunicationContext.sendBuffersCount(this.config);
        int bufferSize = CommunicationContext.bufferSize(this.config);
        this.sendBuffers = new ArrayBlockingQueue(sendBuffersCount);
        for (int i2 = 0; i2 < sendBuffersCount; i2++) {
            this.sendBuffers.offer(new DataBuffer(tWSChannel.createBuffer(bufferSize)));
        }
        this.receiveBuffers = new HashMap();
        LOG.log(Level.FINE, String.format("%d setup communication", Integer.valueOf(this.instancePlan.getThisWorker())));
        setupCommunication();
        LOG.fine(String.format("%d setup initializers", Integer.valueOf(this.instancePlan.getThisWorker())));
        initSerializers();
        setupReceiveGroups(this.receiveGroupsSources);
        initProgressTrackers();
    }

    private void initSerializers() {
        Iterator<MessageSerializer> it = this.messageSerializer.values().iterator();
        while (it.hasNext()) {
            it.next().init(this.config, this.sendBuffers);
        }
        Iterator<MessageDeSerializer> it2 = this.messageDeSerializer.values().iterator();
        while (it2.hasNext()) {
            it2.next().init(this.config);
        }
    }

    private void initProgressTrackers() {
        this.sendProgressTracker = new ProgressionTracker(this.pendingSendMessagesPerSource.keySet());
        this.receiveProgressTracker = new ControlledProgressTracker(this.receiveGroupsSources);
    }

    private void setupCommunication() {
        for (int i = 0; i < this.receiveGroupsWorkers.size(); i++) {
            IntListIterator it = this.receiveGroupsWorkers.get(i).iterator();
            while (it.hasNext()) {
                Integer num = (Integer) it.next();
                LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
                LOG.fine(this.instancePlan.getThisWorker() + " Register to receive from: " + num);
                this.channel.receiveMessage(i, num.intValue(), this.edge, this, linkedBlockingQueue);
                this.receiveBuffers.put(num, linkedBlockingQueue);
            }
        }
        int bufferSize = CommunicationContext.bufferSize(this.config);
        int sendBuffersCount = CommunicationContext.sendBuffersCount(this.config);
        for (int i2 = 0; i2 < sendBuffersCount; i2++) {
            this.sendBuffers.offer(new DataBuffer(this.channel.createBuffer(bufferSize)));
        }
    }

    public void setupReceiveGroups(List<IntArrayList> list) {
        this.receiveIdGroups = list;
        int i = Integer.MIN_VALUE;
        for (int i2 = 0; i2 < list.size(); i2++) {
            List list2 = list.get(i2);
            if (list2.size() > i) {
                i = list2.size();
            }
        }
        int bufferSize = CommunicationContext.bufferSize(this.config);
        this.freeReceiveBuffers = new ArrayBlockingQueue(i);
        for (int i3 = 0; i3 < i; i3++) {
            this.freeReceiveBuffers.offer(new DataBuffer(this.channel.createBuffer(bufferSize)));
        }
    }

    public void startGroup(int i, int i2, Map<Integer, Integer> map) {
        this.receiveProgressTracker.switchGroup(i);
        List list = this.receiveGroupsWorkers.get(i);
        this.expectedReceivePerWorker = map;
        this.currentReceives.clear();
        for (int i3 = 0; i3 < list.size(); i3++) {
            int intValue = ((Integer) list.get(i3)).intValue();
            if (map.get(Integer.valueOf(intValue)).intValue() > 0) {
                Queue<DataBuffer> queue = this.receiveBuffers.get(Integer.valueOf(intValue));
                DataBuffer poll = this.freeReceiveBuffers.poll();
                if (poll == null) {
                    throw new RuntimeException("Buffer cannot be null");
                }
                queue.offer(poll);
            }
            this.currentReceives.put(Integer.valueOf(intValue), 0);
        }
    }

    public boolean sendMessage(int i, Object obj, int i2, int i3, RoutingParameters routingParameters) {
        ArrayBlockingQueue<OutMessage> arrayBlockingQueue = this.pendingSendMessagesPerSource.get(Integer.valueOf(i));
        if (arrayBlockingQueue == null) {
            throw new RuntimeException(String.format("%d No send messages %d", Integer.valueOf(this.executor), Integer.valueOf(i)));
        }
        return offerForSend(i, obj, i2, i3, routingParameters, arrayBlockingQueue);
    }

    public void onReceiveComplete(int i, int i2, DataBuffer dataBuffer) {
        ByteBuffer byteBuffer = dataBuffer.getByteBuffer();
        byteBuffer.position(dataBuffer.getSize());
        byteBuffer.flip();
        int i3 = byteBuffer.getInt(0);
        InMessage inMessage = this.currentMessages.get(Integer.valueOf(i3));
        if (inMessage != null) {
            if (inMessage.addBufferAndCalculate(dataBuffer)) {
                this.currentMessages.remove(Integer.valueOf(i3));
                return;
            }
            return;
        }
        MessageHeader buildHeader = this.messageDeSerializer.get(Integer.valueOf(i3)).buildHeader(dataBuffer, i2);
        PrimitiveMessageTypes primitiveMessageTypes = this.receiveDataType;
        EmptyType emptyType = this.receiveKeyType;
        if ((buildHeader.getFlags() & 33554432) == 33554432) {
            primitiveMessageTypes = MessageTypes.BYTE_ARRAY;
            emptyType = MessageTypes.EMPTY;
        }
        InMessage inMessage2 = new InMessage(i, primitiveMessageTypes, this, buildHeader);
        if (this.isKeyed) {
            inMessage2.setKeyType(emptyType);
        }
        if (!inMessage2.addBufferAndCalculate(dataBuffer)) {
            this.currentMessages.put(Integer.valueOf(i3), inMessage2);
        }
        Queue<InMessage> queue = this.pendingReceiveDeSerializations.get(Integer.valueOf(i3));
        if (!queue.offer(inMessage2)) {
            throw new RuntimeException(this.executor + " We should have enough space: " + queue.size());
        }
    }

    public boolean isComplete() {
        Iterator<Map.Entry<Integer, ArrayBlockingQueue<OutMessage>>> it = this.pendingSendMessagesPerSource.entrySet().iterator();
        while (it.hasNext()) {
            if (it.next().getValue().size() > 0) {
                return false;
            }
        }
        return this.externalSendsPending.get() == 0;
    }

    public void progress() {
        int next;
        int next2;
        if (this.sendProgressTracker.canProgress() && (next2 = this.sendProgressTracker.next()) != Integer.MIN_VALUE) {
            sendProgress(next2);
            this.sendProgressTracker.finish(next2);
        }
        if (!this.receiveProgressTracker.canProgress() || (next = this.receiveProgressTracker.next()) == Integer.MIN_VALUE) {
            return;
        }
        receiveDeserializeProgress(next);
        receiveProgress(next);
        this.receiveProgressTracker.finish(next);
    }

    private boolean offerForSend(int i, Object obj, int i2, int i3, RoutingParameters routingParameters, ArrayBlockingQueue<OutMessage> arrayBlockingQueue) {
        if (arrayBlockingQueue.remainingCapacity() <= 0) {
            return false;
        }
        int i4 = DEFAULT_PATH;
        if (routingParameters.getExternalRoutes().size() > 0) {
            i4 = routingParameters.getDestinationId();
        }
        return arrayBlockingQueue.offer(new OutMessage(i, this.edge, i4, i2, i3, routingParameters.getInternalRoutes(), routingParameters.getExternalRoutes(), this.dataType, this.keyType, this, obj));
    }

    public void sendProgress(int i) {
        boolean z = true;
        ArrayBlockingQueue<OutMessage> arrayBlockingQueue = this.pendingSendMessagesPerSource.get(Integer.valueOf(i));
        while (arrayBlockingQueue.size() > 0 && z) {
            OutMessage peek = arrayBlockingQueue.peek();
            z = sendInternally(peek, peek.getData());
            if (z) {
                if (peek.getExternalSends().size() == 0) {
                    arrayBlockingQueue.poll();
                } else {
                    Queue<ChannelMessage> channelMessages = peek.getChannelMessages();
                    ChannelMessage build = this.messageSerializer.get(Integer.valueOf(i)).build(peek.getData(), peek);
                    if (build != null) {
                        channelMessages.offer(build);
                    }
                    ChannelMessage peek2 = channelMessages.peek();
                    if (peek2 == null) {
                        return;
                    }
                    List<Integer> externalSends = peek.getExternalSends();
                    if (peek.getSendState() == OutMessage.SendState.SERIALIZED) {
                        z = sendExternally(peek, peek2, externalSends, peek2.getAcceptedExternalSends());
                        if (peek2.getAcceptedExternalSends() == externalSends.size()) {
                            arrayBlockingQueue.poll();
                            channelMessages.poll();
                            this.receiver.sendCompleted(peek);
                        }
                    } else {
                        if (peek.getSendState() != OutMessage.SendState.PARTIALLY_SERIALIZED) {
                            return;
                        }
                        z = sendExternally(peek, peek2, externalSends, peek2.getAcceptedExternalSends());
                        if (peek2.getAcceptedExternalSends() == externalSends.size()) {
                            channelMessages.poll();
                        }
                    }
                }
            }
        }
    }

    private boolean sendExternally(OutMessage outMessage, ChannelMessage channelMessage, List<Integer> list, int i) {
        boolean z = true;
        this.lock.lock();
        try {
            if (!channelMessage.isOutCountUpdated()) {
                channelMessage.incrementRefCount(outMessage.getExternalSends().size());
                channelMessage.setOutCountUpdated(true);
            }
            int i2 = i;
            while (true) {
                if (i2 >= list.size()) {
                    break;
                }
                if (!sendMessageToTarget(channelMessage, list.get(i2).intValue())) {
                    z = false;
                    break;
                }
                channelMessage.incrementAcceptedExternalSends();
                this.externalSendsPending.incrementAndGet();
                i2++;
            }
            return z;
        } finally {
            this.lock.unlock();
        }
    }

    private boolean sendInternally(OutMessage outMessage, Object obj) {
        boolean z = true;
        if (outMessage.getSendState() == OutMessage.SendState.INIT) {
            int acceptedInternalSends = outMessage.getAcceptedInternalSends();
            List<Integer> internalSends = outMessage.getInternalSends();
            int i = acceptedInternalSends;
            while (true) {
                if (i >= outMessage.getInternalSends().size()) {
                    break;
                }
                this.lock.lock();
                try {
                    boolean receiveSendInternally = this.receiver.receiveSendInternally(outMessage.getSource(), internalSends.get(i).intValue(), outMessage.getTarget(), outMessage.getFlags(), obj);
                    this.lock.unlock();
                    if (!receiveSendInternally) {
                        z = false;
                        break;
                    }
                    this.receiver.sendCompleted(outMessage);
                    outMessage.incrementAcceptedInternalSends();
                    i++;
                } catch (Throwable th) {
                    this.lock.unlock();
                    throw th;
                }
            }
            if (z) {
                outMessage.setSendState(OutMessage.SendState.SENT_INTERNALLY);
            }
        }
        return z;
    }

    public void receiveDeserializeProgress(int i) {
        Queue<InMessage> queue = this.pendingReceiveDeSerializations.get(Integer.valueOf(i));
        InMessage peek = queue.peek();
        if (peek == null) {
            return;
        }
        if (peek.getReceivedState() == InMessage.ReceivedState.INIT || peek.getReceivedState() == InMessage.ReceivedState.BUILDING) {
            if (peek.getReceivedState() == InMessage.ReceivedState.INIT) {
                Queue<InMessage> queue2 = this.pendingReceiveMessagesPerSource.get(Integer.valueOf(peek.getHeader().getSourceId()));
                if (!queue2.offer(peek)) {
                    throw new RuntimeException(this.executor + " We should have enough space: " + queue2.size());
                }
                peek.setReceivedState(InMessage.ReceivedState.BUILDING);
            }
            this.messageDeSerializer.get(Integer.valueOf(i)).build(peek, peek.getHeader().getEdge());
            if (peek.getUnPkNumberObjects() == Math.abs(peek.getHeader().getNumberTuples())) {
                peek.setReceivedState(InMessage.ReceivedState.BUILT);
            }
        }
        if (peek.getReceivedState() == InMessage.ReceivedState.BUILT || peek.getReceivedState() == InMessage.ReceivedState.RECEIVE || peek.getReceivedState() == InMessage.ReceivedState.DONE) {
            queue.poll();
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:60:0x0179, code lost:
    
        if (r0 != edu.iu.dsc.tws.comms.dfw.InMessage.ReceivedState.BUILT) goto L44;
     */
    /* JADX WARN: Code restructure failed: missing block: B:62:0x0186, code lost:
    
        if (r0.getBuiltMessages().size() != 0) goto L44;
     */
    /* JADX WARN: Code restructure failed: missing block: B:64:0x018a, code lost:
    
        if (r7 == false) goto L44;
     */
    /* JADX WARN: Code restructure failed: missing block: B:65:0x018d, code lost:
    
        r0.setReceivedState(edu.iu.dsc.tws.comms.dfw.InMessage.ReceivedState.RECEIVE);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void receiveProgress(int r5) {
        /*
            Method dump skipped, instructions count: 511
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: edu.iu.dsc.tws.comms.dfw.ControlledChannelOperation.receiveProgress(int):void");
    }

    private boolean sendMessageToTarget(ChannelMessage channelMessage, int i) {
        return this.channel.sendMessage(this.instancePlan.getWorkerForForLogicalId(i), channelMessage, this);
    }

    public void release(ChannelMessage channelMessage) {
        if (channelMessage.doneProcessing()) {
            releaseTheBuffers(channelMessage.getOriginatingId(), channelMessage);
        }
    }

    public void onSendComplete(int i, int i2, ChannelMessage channelMessage) {
        channelMessage.release();
        this.externalSendsPending.getAndDecrement();
    }

    private void releaseTheBuffers(int i, ChannelMessage channelMessage) {
        if (MessageDirection.IN == channelMessage.getMessageDirection()) {
            for (DataBuffer dataBuffer : channelMessage.getNormalBuffers()) {
                dataBuffer.getByteBuffer().clear();
                this.addedFreedBuffers++;
                if (!this.freeReceiveBuffers.offer(dataBuffer)) {
                    throw new RuntimeException(String.format("%d Buffer release failed for target %d", Integer.valueOf(this.executor), Integer.valueOf(channelMessage.getHeader().getDestinationIdentifier())));
                }
            }
            return;
        }
        if (MessageDirection.OUT == channelMessage.getMessageDirection()) {
            ArrayBlockingQueue arrayBlockingQueue = (ArrayBlockingQueue) this.sendBuffers;
            for (DataBuffer dataBuffer2 : channelMessage.getNormalBuffers()) {
                dataBuffer2.getByteBuffer().clear();
                if (!arrayBlockingQueue.offer(dataBuffer2)) {
                    throw new RuntimeException(String.format("%d Buffer release failed for source %d %d %d", Integer.valueOf(this.executor), Integer.valueOf(channelMessage.getOriginatingId()), Integer.valueOf(arrayBlockingQueue.size()), Integer.valueOf(arrayBlockingQueue.remainingCapacity())));
                }
            }
        }
    }

    public LogicalPlan getInstancePlan() {
        return this.instancePlan;
    }

    public Config getConfig() {
        return this.config;
    }

    public void setKeyType(MessageType messageType) {
        this.keyType = messageType;
    }

    public void close() {
        Iterator<Integer> it = this.receivingExecutors.iterator();
        while (it.hasNext()) {
            this.channel.releaseBuffers(it.next().intValue(), this.edge);
        }
    }
}
