package edu.iu.dsc.tws.executor.core.streaming;

import edu.iu.dsc.tws.api.checkpointing.CheckpointingClient;
import edu.iu.dsc.tws.api.checkpointing.Snapshot;
import edu.iu.dsc.tws.api.checkpointing.StateStore;
import edu.iu.dsc.tws.api.compute.IMessage;
import edu.iu.dsc.tws.api.compute.OutputCollection;
import edu.iu.dsc.tws.api.compute.TaskMessage;
import edu.iu.dsc.tws.api.compute.executor.ExecutorContext;
import edu.iu.dsc.tws.api.compute.executor.INodeInstance;
import edu.iu.dsc.tws.api.compute.executor.IParallelOperation;
import edu.iu.dsc.tws.api.compute.graph.OperationMode;
import edu.iu.dsc.tws.api.compute.modifiers.Closable;
import edu.iu.dsc.tws.api.compute.nodes.INode;
import edu.iu.dsc.tws.api.compute.nodes.ISource;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskSchedulePlan;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.checkpointing.api.SnapshotImpl;
import edu.iu.dsc.tws.checkpointing.task.CheckpointableTask;
import edu.iu.dsc.tws.checkpointing.util.CheckpointUtils;
import edu.iu.dsc.tws.checkpointing.util.CheckpointingConfigurations;
import edu.iu.dsc.tws.executor.core.DefaultOutputCollection;
import edu.iu.dsc.tws.executor.core.TaskCheckpointUtils;
import edu.iu.dsc.tws.executor.core.TaskContextImpl;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/executor/core/streaming/SourceStreamingInstance.class */
public class SourceStreamingInstance implements INodeInstance {
    private static final Logger LOG = Logger.getLogger(SourceStreamingInstance.class.getName());
    private ISource streamingTask;
    private BlockingQueue<IMessage> outStreamingQueue;
    private Config config;
    private OutputCollection outputStreamingCollection;
    private int globalTaskId;
    private int taskId;
    private int streamingTaskIndex;
    private int parallelism;
    private String taskName;
    private Map<String, Object> nodeConfigs;
    private int workerId;
    private int lowWaterMark;
    private int highWaterMark;
    private Map<String, String> outEdges;
    private TaskSchedulePlan taskSchedule;
    private CheckpointingClient checkpointingClient;
    private String taskGraphName;
    private long tasksVersion;
    private boolean checkpointable;
    private StateStore stateStore;
    private long checkPointingFrequency;
    private IParallelOperation[] outOpArray;
    private String[] outEdgeArray;
    private Map<String, IParallelOperation> outStreamingParOps = new HashMap();
    private long checkpointVersion = 0;
    private int barrierMessagesSent = 0;
    private Queue<Snapshot> snapshotQueue = new LinkedList();
    private long executions = 0;
    private SnapshotImpl snapshot = new SnapshotImpl();

    public SourceStreamingInstance(ISource iSource, BlockingQueue<IMessage> blockingQueue, Config config, String str, int i, int i2, int i3, int i4, int i5, Map<String, Object> map, Map<String, String> map2, TaskSchedulePlan taskSchedulePlan, CheckpointingClient checkpointingClient, String str2, long j) {
        this.checkPointingFrequency = 1000L;
        this.streamingTask = iSource;
        this.taskId = i;
        this.outStreamingQueue = blockingQueue;
        this.config = config;
        this.globalTaskId = i2;
        this.streamingTaskIndex = i3;
        this.parallelism = i4;
        this.taskName = str;
        this.nodeConfigs = map;
        this.workerId = i5;
        this.lowWaterMark = ExecutorContext.instanceQueueLowWaterMark(config);
        this.highWaterMark = ExecutorContext.instanceQueueHighWaterMark(config);
        this.outEdges = map2;
        this.taskSchedule = taskSchedulePlan;
        this.checkpointingClient = checkpointingClient;
        this.taskGraphName = str2;
        this.tasksVersion = j;
        this.checkpointable = (this.streamingTask instanceof CheckpointableTask) && CheckpointingConfigurations.isCheckpointingEnabled(config);
        this.checkPointingFrequency = CheckpointingConfigurations.getCheckPointingFrequency(config);
    }

    public void prepare(Config config) {
        this.outputStreamingCollection = new DefaultOutputCollection(this.outStreamingQueue);
        this.streamingTask.prepare(config, new TaskContextImpl(this.streamingTaskIndex, this.taskId, this.globalTaskId, this.taskName, this.parallelism, this.workerId, this.outputStreamingCollection, this.nodeConfigs, this.outEdges, this.taskSchedule, OperationMode.STREAMING));
        this.outOpArray = new IParallelOperation[this.outStreamingParOps.size()];
        int i = 0;
        Iterator<Map.Entry<String, IParallelOperation>> it = this.outStreamingParOps.entrySet().iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            this.outOpArray[i2] = it.next().getValue();
        }
        this.outEdgeArray = new String[this.outEdges.size()];
        int i3 = 0;
        Iterator<String> it2 = this.outEdges.keySet().iterator();
        while (it2.hasNext()) {
            int i4 = i3;
            i3++;
            this.outEdgeArray[i4] = it2.next();
        }
        if (this.checkpointable) {
            this.stateStore = CheckpointUtils.getStateStore(this.config);
            this.stateStore.init(this.config, new String[]{this.taskGraphName, String.valueOf(this.globalTaskId)});
            TaskCheckpointUtils.restore(this.streamingTask, this.snapshot, this.stateStore, this.tasksVersion, this.globalTaskId);
        }
    }

    /*  JADX ERROR: Failed to decode insn: 0x0025: MOVE_MULTI, method: edu.iu.dsc.tws.executor.core.streaming.SourceStreamingInstance.execute():boolean
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[9]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    /*  JADX ERROR: Failed to decode insn: 0x005D: MOVE_MULTI, method: edu.iu.dsc.tws.executor.core.streaming.SourceStreamingInstance.execute():boolean
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[9]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    public boolean execute() {
        /*
            Method dump skipped, instructions count: 312
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: edu.iu.dsc.tws.executor.core.streaming.SourceStreamingInstance.execute():boolean");
    }

    public INode getNode() {
        return this.streamingTask;
    }

    public void close() {
        if (this.streamingTask instanceof Closable) {
            this.streamingTask.close();
        }
    }

    public BlockingQueue<IMessage> getOutStreamingQueue() {
        return this.outStreamingQueue;
    }

    public void scheduleBarriers(Long l) {
        ByteBuffer allocate = ByteBuffer.allocate(8);
        allocate.putLong(l.longValue());
        Iterator<String> it = this.outStreamingParOps.keySet().iterator();
        while (it.hasNext()) {
            this.outStreamingQueue.add(new TaskMessage(allocate.array(), 33554432, it.next(), this.globalTaskId));
        }
    }

    public void registerOutParallelOperation(String str, IParallelOperation iParallelOperation) {
        this.outStreamingParOps.put(str, iParallelOperation);
    }
}
