package edu.iu.dsc.tws.executor.comms.batch;

import edu.iu.dsc.tws.api.comms.BaseOperation;
import edu.iu.dsc.tws.api.comms.BulkReceiver;
import edu.iu.dsc.tws.api.comms.CommunicationContext;
import edu.iu.dsc.tws.api.comms.Communicator;
import edu.iu.dsc.tws.api.comms.LogicalPlan;
import edu.iu.dsc.tws.api.comms.structs.Tuple;
import edu.iu.dsc.tws.api.compute.IMessage;
import edu.iu.dsc.tws.api.compute.TaskMessage;
import edu.iu.dsc.tws.api.compute.executor.ISync;
import edu.iu.dsc.tws.api.compute.graph.Edge;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.comms.batch.BJoin;
import edu.iu.dsc.tws.comms.selectors.HashingSelector;
import edu.iu.dsc.tws.executor.comms.AbstractParallelOperation;
import edu.iu.dsc.tws.executor.comms.DefaultDestinationSelector;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.BlockingQueue;

/* loaded from: input_file:edu/iu/dsc/tws/executor/comms/batch/JoinBatchOperation.class */
public class JoinBatchOperation extends AbstractParallelOperation {
    protected BJoin op;
    private Edge leftEdge;
    private Edge rightEdge;
    private Set<Integer> finishedSources;

    /* loaded from: input_file:edu/iu/dsc/tws/executor/comms/batch/JoinBatchOperation$JoinRecvrImpl.class */
    private class JoinRecvrImpl implements BulkReceiver {
        private JoinRecvrImpl() {
        }

        public void init(Config config, Set<Integer> set) {
        }

        public boolean receive(int i, Iterator<Object> it) {
            TaskMessage taskMessage = new TaskMessage(it, JoinBatchOperation.this.inEdge, i);
            BlockingQueue blockingQueue = (BlockingQueue) JoinBatchOperation.this.outMessages.get(Integer.valueOf(i));
            if (blockingQueue != null) {
                return blockingQueue.offer(taskMessage);
            }
            throw new RuntimeException("Un-expected message for target: " + i);
        }

        public boolean sync(int i, byte[] bArr) {
            return ((ISync) JoinBatchOperation.this.syncs.get(Integer.valueOf(i))).sync(JoinBatchOperation.this.inEdge, bArr);
        }
    }

    public JoinBatchOperation(Config config, Communicator communicator, LogicalPlan logicalPlan, Set<Integer> set, Set<Integer> set2, Edge edge, Edge edge2) {
        super(config, communicator, logicalPlan, edge.getTargetEdge());
        this.finishedSources = new HashSet();
        this.leftEdge = edge;
        this.rightEdge = edge2;
        try {
            this.op = new BJoin(this.channel.newWithConfig(edge.getProperties()), this.logicalPlan, set, set2, edge.getKeyType(), edge.getDataType(), edge2.getDataType(), new JoinRecvrImpl(), edge.getPartitioner() != null ? new DefaultDestinationSelector(edge.getPartitioner()) : new HashingSelector(), ((Boolean) edge.getProperty("use-disk")).booleanValue(), (Comparator) edge.getProperty("key-comparator"), edge.getEdgeID().nextId(), edge2.getEdgeID().nextId(), (CommunicationContext.JoinType) edge.getProperty("join-type"), edge.getMessageSchema(), edge2.getMessageSchema());
        } catch (Exception e) {
            throw new RuntimeException("Unable to get properties", e);
        }
    }

    public boolean send(int i, IMessage iMessage, int i2) {
        TaskMessage taskMessage = (TaskMessage) iMessage;
        return iMessage.edge().equals(this.leftEdge.getName()) ? this.op.join(i, ((Tuple) taskMessage.getContent()).getKey(), ((Tuple) taskMessage.getContent()).getValue(), i2, 0) : this.op.join(i, ((Tuple) taskMessage.getContent()).getKey(), ((Tuple) taskMessage.getContent()).getValue(), i2, 1);
    }

    @Override // edu.iu.dsc.tws.executor.comms.AbstractParallelOperation
    public void finish(int i) {
        if (this.finishedSources.contains(Integer.valueOf(i))) {
            return;
        }
        super.finish(i);
        this.finishedSources.add(Integer.valueOf(i));
    }

    @Override // edu.iu.dsc.tws.executor.comms.AbstractParallelOperation
    public void close() {
        super.close();
        this.finishedSources.clear();
    }

    @Override // edu.iu.dsc.tws.executor.comms.AbstractParallelOperation
    public void reset() {
        super.reset();
        this.finishedSources.clear();
    }

    public BaseOperation getOp() {
        return this.op;
    }
}
