package edu.iu.dsc.tws.executor.core.streaming;

import edu.iu.dsc.tws.api.checkpointing.CheckpointingClient;
import edu.iu.dsc.tws.api.checkpointing.StateStore;
import edu.iu.dsc.tws.api.compute.IMessage;
import edu.iu.dsc.tws.api.compute.OutputCollection;
import edu.iu.dsc.tws.api.compute.executor.ExecutorContext;
import edu.iu.dsc.tws.api.compute.executor.INodeInstance;
import edu.iu.dsc.tws.api.compute.executor.IParallelOperation;
import edu.iu.dsc.tws.api.compute.executor.ISync;
import edu.iu.dsc.tws.api.compute.graph.OperationMode;
import edu.iu.dsc.tws.api.compute.modifiers.Closable;
import edu.iu.dsc.tws.api.compute.nodes.ICompute;
import edu.iu.dsc.tws.api.compute.nodes.INode;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskSchedulePlan;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.checkpointing.api.SnapshotImpl;
import edu.iu.dsc.tws.checkpointing.task.CheckpointableTask;
import edu.iu.dsc.tws.checkpointing.task.CheckpointingSGatherSink;
import edu.iu.dsc.tws.checkpointing.util.CheckpointUtils;
import edu.iu.dsc.tws.checkpointing.util.CheckpointingContext;
import edu.iu.dsc.tws.executor.core.DefaultOutputCollection;
import edu.iu.dsc.tws.executor.core.TaskCheckpointUtils;
import edu.iu.dsc.tws.executor.core.TaskContextImpl;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/executor/core/streaming/TaskStreamingInstance.class */
public class TaskStreamingInstance implements INodeInstance, ISync {
    protected ICompute task;
    private static final Logger LOG = Logger.getLogger(TaskStreamingInstance.class.getName());
    protected BlockingQueue<IMessage> inQueue;
    protected BlockingQueue<IMessage> outQueue;
    protected Config config;
    protected OutputCollection outputCollection;
    protected int globalTaskId;
    protected int taskId;
    protected int taskIndex;
    protected int parallelism;
    protected String taskName;
    protected Map<String, Object> nodeConfigs;
    protected Map<String, IParallelOperation> outParOps = new HashMap();
    protected Map<String, IParallelOperation> inParOps = new HashMap();
    protected int workerId;
    protected int lowWaterMark;
    protected int highWaterMark;
    protected Map<String, String> outputEdges;
    protected TaskSchedulePlan taskSchedule;
    private CheckpointingClient checkpointingClient;
    private String taskGraphName;
    private long tasksVersion;
    protected Map<String, Set<String>> inputEdges;
    private boolean checkpointable;
    private StateStore stateStore;
    private SnapshotImpl snapshot;
    private IParallelOperation[] intOpArray;
    private String[] inEdgeArray;
    private IParallelOperation[] outOpArray;
    private String[] outEdgeArray;
    private PendingCheckpoint pendingCheckpoint;
    private TaskContextImpl taskContext;
    private boolean ftGatherTask;
    private CheckpointingSGatherSink checkpointingSGatherSink;

    public TaskStreamingInstance(ICompute iCompute, BlockingQueue<IMessage> blockingQueue, BlockingQueue<IMessage> blockingQueue2, Config config, String str, int i, int i2, int i3, int i4, int i5, Map<String, Object> map, Map<String, Set<String>> map2, Map<String, String> map3, TaskSchedulePlan taskSchedulePlan, CheckpointingClient checkpointingClient, String str2, long j) {
        this.task = iCompute;
        this.inQueue = blockingQueue;
        this.outQueue = blockingQueue2;
        this.config = config;
        this.globalTaskId = i2;
        this.taskId = i;
        this.taskIndex = i3;
        this.parallelism = i4;
        this.taskName = str;
        this.nodeConfigs = map;
        this.workerId = i5;
        this.lowWaterMark = ExecutorContext.instanceQueueLowWaterMark(config);
        this.highWaterMark = ExecutorContext.instanceQueueHighWaterMark(config);
        this.inputEdges = map2;
        this.outputEdges = map3;
        this.taskSchedule = taskSchedulePlan;
        this.checkpointingClient = checkpointingClient;
        this.taskGraphName = str2;
        this.tasksVersion = j;
        this.checkpointable = (this.task instanceof CheckpointableTask) && CheckpointingContext.isCheckpointingEnabled(config);
        this.ftGatherTask = this.task instanceof CheckpointingSGatherSink;
        if (this.ftGatherTask) {
            this.checkpointingSGatherSink = this.task;
        }
        this.snapshot = new SnapshotImpl();
    }

    public void prepare(Config config) {
        this.outputCollection = new DefaultOutputCollection(this.outQueue);
        this.taskContext = new TaskContextImpl(this.taskIndex, this.taskId, this.globalTaskId, this.taskName, this.parallelism, this.workerId, this.outputCollection, this.nodeConfigs, this.inputEdges, this.outputEdges, this.taskSchedule, OperationMode.STREAMING);
        this.task.prepare(config, this.taskContext);
        this.outOpArray = new IParallelOperation[this.outParOps.size()];
        int i = 0;
        Iterator<Map.Entry<String, IParallelOperation>> it = this.outParOps.entrySet().iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            this.outOpArray[i2] = it.next().getValue();
        }
        this.outEdgeArray = new String[this.outputEdges.size()];
        int i3 = 0;
        Iterator<String> it2 = this.outputEdges.keySet().iterator();
        while (it2.hasNext()) {
            int i4 = i3;
            i3++;
            this.outEdgeArray[i4] = it2.next();
        }
        this.intOpArray = new IParallelOperation[this.inParOps.size()];
        int i5 = 0;
        Iterator<Map.Entry<String, IParallelOperation>> it3 = this.inParOps.entrySet().iterator();
        while (it3.hasNext()) {
            int i6 = i5;
            i5++;
            this.intOpArray[i6] = it3.next().getValue();
        }
        this.inEdgeArray = new String[this.inputEdges.size()];
        int i7 = 0;
        Iterator<String> it4 = this.inputEdges.keySet().iterator();
        while (it4.hasNext()) {
            int i8 = i7;
            i7++;
            this.inEdgeArray[i8] = it4.next();
        }
        if (this.checkpointable) {
            this.stateStore = CheckpointUtils.getStateStore(this.config);
            this.stateStore.init(this.config, new String[]{this.taskGraphName, String.valueOf(this.globalTaskId)});
            this.pendingCheckpoint = new PendingCheckpoint(this.taskGraphName, this.task, this.globalTaskId, this.intOpArray, this.inEdgeArray.length, this.checkpointingClient, this.stateStore, this.snapshot);
            TaskCheckpointUtils.restore(this.task, this.snapshot, this.stateStore, this.tasksVersion, this.globalTaskId);
        }
    }

    public void registerOutParallelOperation(String str, IParallelOperation iParallelOperation) {
        this.outParOps.put(str, iParallelOperation);
    }

    public void registerInParallelOperation(String str, IParallelOperation iParallelOperation) {
        this.inParOps.put(str, iParallelOperation);
    }

    public int getIndex() {
        return this.taskIndex;
    }

    public boolean execute() {
        while (!this.inQueue.isEmpty() && this.outQueue.size() < this.lowWaterMark) {
            IMessage poll = this.inQueue.poll();
            if (poll != null) {
                this.task.execute(poll);
            }
        }
        boolean isEmpty = this.inQueue.isEmpty();
        while (!this.outQueue.isEmpty()) {
            IMessage peek = this.outQueue.peek();
            if (peek != null) {
                String edge = peek.edge();
                boolean z = (peek.getFlag() & 33554432) == 33554432;
                IParallelOperation iParallelOperation = this.outParOps.get(edge);
                if (z) {
                    if (!iParallelOperation.sendBarrier(this.globalTaskId, (byte[]) peek.getContent())) {
                        isEmpty = false;
                        break;
                    }
                    this.outQueue.poll();
                } else {
                    if (!iParallelOperation.send(this.globalTaskId, peek, peek.getFlag())) {
                        isEmpty = false;
                        break;
                    }
                    this.outQueue.poll();
                }
            }
        }
        for (int i = 0; i < this.outOpArray.length; i++) {
            if (this.outOpArray[i].progress()) {
                isEmpty = false;
            }
        }
        for (int i2 = 0; i2 < this.intOpArray.length; i2++) {
            if (this.intOpArray[i2].progress()) {
                isEmpty = false;
            }
        }
        if (this.checkpointable && this.inQueue.isEmpty() && this.outQueue.isEmpty()) {
            long execute = this.pendingCheckpoint.execute();
            if (execute != -1) {
                this.task.onCheckpointPropagated(this.snapshot);
                this.taskContext.write("ft-gather-edge", Long.valueOf(execute));
                scheduleBarriers(Long.valueOf(execute));
                isEmpty = false;
            }
        }
        return !isEmpty;
    }

    public void scheduleBarriers(Long l) {
        ByteBuffer allocate = ByteBuffer.allocate(8);
        allocate.putLong(l.longValue());
        for (String str : this.outEdgeArray) {
            this.taskContext.writeBarrier(str, allocate.array());
        }
    }

    public INode getNode() {
        return this.task;
    }

    public void close() {
        if (this.task instanceof Closable) {
            this.task.close();
        }
    }

    public BlockingQueue<IMessage> getInQueue() {
        return this.inQueue;
    }

    private long extractSyncId(byte[] bArr) {
        return ByteBuffer.wrap(bArr).getLong();
    }

    public boolean sync(String str, byte[] bArr) {
        if (this.checkpointable) {
            long extractSyncId = extractSyncId(bArr);
            LOG.fine(() -> {
                return "Barrier received to " + this.globalTaskId + " with id " + extractSyncId + " from " + str;
            });
            this.pendingCheckpoint.schedule(str, extractSyncId);
            return true;
        }
        if (this.ftGatherTask) {
            long extractSyncId2 = extractSyncId(bArr);
            this.checkpointingClient.sendVersionUpdate(this.taskGraphName, this.checkpointingSGatherSink.getParentTaskId(), extractSyncId2, (requestID, i, message) -> {
                LOG.log(Level.FINE, "Checkpoint of " + this.globalTaskId + " committed with version : " + extractSyncId2);
            });
        }
        this.inParOps.get(str).reset();
        return true;
    }
}
