package edu.iu.dsc.tws.executor.core.streaming;

import edu.iu.dsc.tws.api.checkpointing.CheckpointingClient;
import edu.iu.dsc.tws.api.checkpointing.StateStore;
import edu.iu.dsc.tws.api.compute.IMessage;
import edu.iu.dsc.tws.api.compute.executor.INodeInstance;
import edu.iu.dsc.tws.api.compute.executor.IParallelOperation;
import edu.iu.dsc.tws.api.compute.executor.ISync;
import edu.iu.dsc.tws.api.compute.graph.OperationMode;
import edu.iu.dsc.tws.api.compute.modifiers.Closable;
import edu.iu.dsc.tws.api.compute.nodes.ICompute;
import edu.iu.dsc.tws.api.compute.nodes.INode;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskSchedulePlan;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.checkpointing.api.SnapshotImpl;
import edu.iu.dsc.tws.checkpointing.task.CheckpointableTask;
import edu.iu.dsc.tws.checkpointing.util.CheckpointUtils;
import edu.iu.dsc.tws.checkpointing.util.CheckpointingConfigurations;
import edu.iu.dsc.tws.executor.core.TaskCheckpointUtils;
import edu.iu.dsc.tws.executor.core.TaskContextImpl;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/executor/core/streaming/SinkStreamingInstance.class */
public class SinkStreamingInstance implements INodeInstance, ISync {
    private static final Logger LOG = Logger.getLogger(SinkStreamingInstance.class.getName());
    private final boolean checkpointable;
    protected ICompute streamingTask;
    protected BlockingQueue<IMessage> streamingInQueue;
    protected Map<String, IParallelOperation> streamingInParOps = new HashMap();
    protected Config config;
    protected int globalTaskId;
    protected int taskId;
    protected int streamingTaskIndex;
    protected int parallelism;
    protected String taskName;
    protected Map<String, Object> nodeConfigs;
    protected int workerId;
    protected Map<String, Set<String>> inEdges;
    protected TaskSchedulePlan taskSchedulePlan;
    private CheckpointingClient checkpointingClient;
    private String taskGraphName;
    private Long taskVersion;
    private StateStore stateStore;
    private SnapshotImpl snapshot;
    private PendingCheckpoint pendingCheckpoint;
    private IParallelOperation[] intOpArray;
    private String[] inEdgeArray;

    public SinkStreamingInstance(ICompute iCompute, BlockingQueue<IMessage> blockingQueue, Config config, String str, int i, int i2, int i3, int i4, int i5, Map<String, Object> map, Map<String, Set<String>> map2, TaskSchedulePlan taskSchedulePlan, CheckpointingClient checkpointingClient, String str2, Long l) {
        this.streamingTask = iCompute;
        this.streamingInQueue = blockingQueue;
        this.taskId = i;
        this.config = config;
        this.globalTaskId = i2;
        this.streamingTaskIndex = i3;
        this.parallelism = i4;
        this.nodeConfigs = map;
        this.workerId = i5;
        this.taskName = str;
        this.inEdges = map2;
        this.taskSchedulePlan = taskSchedulePlan;
        this.checkpointingClient = checkpointingClient;
        this.taskGraphName = str2;
        this.taskVersion = l;
        this.checkpointable = (this.streamingTask instanceof CheckpointableTask) && CheckpointingConfigurations.isCheckpointingEnabled(config);
        this.snapshot = new SnapshotImpl();
    }

    public void prepare(Config config) {
        this.streamingTask.prepare(config, new TaskContextImpl(this.streamingTaskIndex, this.taskId, this.globalTaskId, this.taskName, this.parallelism, this.workerId, this.nodeConfigs, this.inEdges, this.taskSchedulePlan, OperationMode.STREAMING));
        this.intOpArray = new IParallelOperation[this.streamingInParOps.size()];
        int i = 0;
        Iterator<Map.Entry<String, IParallelOperation>> it = this.streamingInParOps.entrySet().iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            this.intOpArray[i2] = it.next().getValue();
        }
        this.inEdgeArray = new String[this.inEdges.size()];
        int i3 = 0;
        Iterator<String> it2 = this.inEdges.keySet().iterator();
        while (it2.hasNext()) {
            int i4 = i3;
            i3++;
            this.inEdgeArray[i4] = it2.next();
        }
        if (this.checkpointable) {
            this.stateStore = CheckpointUtils.getStateStore(this.config);
            this.stateStore.init(this.config, new String[]{this.taskGraphName, String.valueOf(this.globalTaskId)});
            this.pendingCheckpoint = new PendingCheckpoint(this.taskGraphName, this.streamingTask, this.globalTaskId, this.intOpArray, this.inEdges.size(), this.checkpointingClient, this.stateStore, this.snapshot);
            TaskCheckpointUtils.restore(this.streamingTask, this.snapshot, this.stateStore, this.taskVersion.longValue(), this.globalTaskId);
        }
    }

    public boolean execute() {
        while (!this.streamingInQueue.isEmpty()) {
            IMessage poll = this.streamingInQueue.poll();
            if (poll != null) {
                this.streamingTask.execute(poll);
            }
        }
        for (int i = 0; i < this.intOpArray.length; i++) {
            this.intOpArray[i].progress();
        }
        if (!this.checkpointable || !this.streamingInQueue.isEmpty() || this.pendingCheckpoint.execute() == -1) {
            return true;
        }
        this.streamingTask.onCheckpointPropagated(this.snapshot);
        return true;
    }

    public INode getNode() {
        return this.streamingTask;
    }

    public void close() {
        if (this.streamingTask instanceof Closable) {
            this.streamingTask.close();
        }
    }

    public void registerInParallelOperation(String str, IParallelOperation iParallelOperation) {
        this.streamingInParOps.put(str, iParallelOperation);
    }

    public BlockingQueue<IMessage> getStreamingInQueue() {
        return this.streamingInQueue;
    }

    public boolean sync(String str, byte[] bArr) {
        if (!this.checkpointable) {
            return true;
        }
        long j = ByteBuffer.wrap(bArr).getLong();
        LOG.fine(() -> {
            return "Barrier received to " + this.globalTaskId + " with id " + j + " from " + str;
        });
        this.pendingCheckpoint.schedule(str, j);
        return true;
    }
}
