package edu.iu.dsc.tws.tset.ops;

import edu.iu.dsc.tws.api.compute.IMessage;
import edu.iu.dsc.tws.api.compute.TaskContext;
import edu.iu.dsc.tws.api.compute.modifiers.IONames;
import edu.iu.dsc.tws.api.compute.modifiers.Receptor;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.dataset.DataPartition;
import edu.iu.dsc.tws.api.tset.TSetContext;
import edu.iu.dsc.tws.api.tset.fn.ComputeFunc;
import edu.iu.dsc.tws.task.window.api.GlobalStreamId;
import edu.iu.dsc.tws.task.window.api.IWindowMessage;
import edu.iu.dsc.tws.task.window.api.WindowLifeCycleListener;
import edu.iu.dsc.tws.task.window.core.BaseWindowedSink;
import edu.iu.dsc.tws.task.window.util.WindowParameter;
import edu.iu.dsc.tws.tset.sets.BaseTSet;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Map;

/* loaded from: input_file:edu/iu/dsc/tws/tset/ops/WindowComputeOp.class */
public class WindowComputeOp<O, I> extends BaseWindowedSink<I> implements Receptor, Serializable {
    private TSetContext tSetContext = new TSetContext();
    private IONames receivables;
    private Map<String, String> rcvTSets;
    private MultiEdgeOpAdapter multiEdgeOpAdapter;
    private ComputeFunc<O, Iterator<I>> computeFunction;

    public WindowComputeOp(ComputeFunc<O, Iterator<I>> computeFunc, WindowParameter windowParameter) {
        this.computeFunction = computeFunc;
        this.windowParameter = windowParameter;
    }

    public WindowComputeOp(ComputeFunc<O, Iterator<I>> computeFunc, BaseTSet baseTSet, Map<String, String> map, WindowParameter windowParameter) {
        this.computeFunction = computeFunc;
        initialize(baseTSet, map, windowParameter);
    }

    public void initialize(BaseTSet baseTSet, Map<String, String> map, WindowParameter windowParameter) {
        this.windowParameter = windowParameter;
        this.receivables = IONames.declare(map.keySet());
        this.rcvTSets = map;
        if (baseTSet != null) {
            this.tSetContext.setId(baseTSet.getId());
            this.tSetContext.setName(baseTSet.getName());
            this.tSetContext.setParallelism(baseTSet.getParallelism());
        }
    }

    public void prepare(Config config, TaskContext taskContext) {
        super.prepare(config, taskContext);
        this.multiEdgeOpAdapter = new MultiEdgeOpAdapter(taskContext);
    }

    public boolean execute(IWindowMessage<I> iWindowMessage) {
        final Iterator it = iWindowMessage.getWindow().stream().iterator();
        writeToEdges(this.computeFunction.compute(new Iterator<I>() { // from class: edu.iu.dsc.tws.tset.ops.WindowComputeOp.1
            @Override // java.util.Iterator
            public boolean hasNext() {
                return it.hasNext();
            }

            @Override // java.util.Iterator
            public I next() {
                return (I) ((IMessage) it.next()).getContent();
            }
        }));
        return true;
    }

    public boolean getExpire(IWindowMessage<I> iWindowMessage) {
        return true;
    }

    public boolean getLateMessages(IMessage<I> iMessage) {
        return true;
    }

    public boolean execute(IMessage<I> iMessage) {
        if (!isTimestamped()) {
            this.windowManager.add(iMessage);
            return true;
        }
        long extractTimestamp = this.iTimestampExtractor.extractTimestamp(iMessage.getContent());
        if (this.watermarkEventGenerator.track(new GlobalStreamId(iMessage.edge()), extractTimestamp)) {
            this.windowManager.add(iMessage, extractTimestamp);
            return true;
        }
        getLateMessages(iMessage);
        return true;
    }

    protected WindowLifeCycleListener<I> newWindowLifeCycleListener() {
        return new WindowLifeCycleListener<I>() { // from class: edu.iu.dsc.tws.tset.ops.WindowComputeOp.2
            public void onExpiry(IWindowMessage<I> iWindowMessage) {
                WindowComputeOp.this.getExpire(iWindowMessage);
            }

            public void onActivation(IWindowMessage<I> iWindowMessage, IWindowMessage<I> iWindowMessage2, IWindowMessage<I> iWindowMessage3) {
                WindowComputeOp.this.collectiveEvents = iWindowMessage;
                WindowComputeOp.this.execute(WindowComputeOp.this.collectiveEvents);
            }
        };
    }

    <T> void writeToEdges(T t) {
        this.multiEdgeOpAdapter.writeToEdges(t);
    }

    void writeEndToEdges() {
        this.multiEdgeOpAdapter.writeEndToEdges();
    }

    <K, V> void keyedWriteToEdges(K k, V v) {
        this.multiEdgeOpAdapter.keyedWriteToEdges(k, v);
    }

    public void add(String str, DataPartition<?> dataPartition) {
        this.tSetContext.addInput(this.rcvTSets.get(str), dataPartition);
    }

    public IONames getReceivableNames() {
        return this.receivables;
    }

    TSetContext gettSetContext() {
        return this.tSetContext;
    }

    public ComputeFunc<O, Iterator<I>> getFunction() {
        return this.computeFunction;
    }
}
