package stream.scotty.sparkconnector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.api.java.function.FlatMapFunction;
import stream.scotty.core.AggregateWindow;
import stream.scotty.core.windowFunction.AggregateFunction;
import stream.scotty.core.windowType.Window;
import stream.scotty.slicing.SlicingWindowOperator;
import stream.scotty.sparkconnector.demo.DemoEvent;
import stream.scotty.state.memory.MemoryStateFactory;

/* loaded from: input_file:stream/scotty/sparkconnector/KeyedScottyWindowOperator.class */
public class KeyedScottyWindowOperator<Key, Value> implements FlatMapFunction<DemoEvent, Value> {
    private long lastWatermark;
    private AggregateFunction windowFunction;
    private long allowedLateness;
    private long watermarkEvictionPeriod = 100;
    private List<Window> windows = new ArrayList();
    private MemoryStateFactory stateFactory = new MemoryStateFactory();
    private HashMap<Key, SlicingWindowOperator<Value>> slicingWindowOperatorMap = new HashMap<>();

    public KeyedScottyWindowOperator(AggregateFunction aggregateFunction, long j) {
        this.windowFunction = aggregateFunction;
        this.allowedLateness = j;
    }

    public SlicingWindowOperator<Value> initWindowOperator() {
        SlicingWindowOperator<Value> slicingWindowOperator = new SlicingWindowOperator<>(this.stateFactory);
        Iterator<Window> it = this.windows.iterator();
        while (it.hasNext()) {
            slicingWindowOperator.addWindowAssigner(it.next());
        }
        slicingWindowOperator.addAggregation(this.windowFunction);
        slicingWindowOperator.setMaxLateness(this.allowedLateness);
        return slicingWindowOperator;
    }

    public Iterator<Value> call(DemoEvent demoEvent) throws Exception {
        Integer key = demoEvent.getKey();
        if (!this.slicingWindowOperatorMap.containsKey(key)) {
            this.slicingWindowOperatorMap.put(key, initWindowOperator());
        }
        this.slicingWindowOperatorMap.get(key).processElement(demoEvent.getValue(), demoEvent.getTimestamp());
        ArrayList<Value> arrayList = new ArrayList<>();
        processWatermark(demoEvent.getTimestamp(), key, arrayList);
        return arrayList.iterator();
    }

    private void processWatermark(long j, Key key, ArrayList<Value> arrayList) {
        if (j > this.lastWatermark + this.watermarkEvictionPeriod) {
            Iterator<SlicingWindowOperator<Value>> it = this.slicingWindowOperatorMap.values().iterator();
            while (it.hasNext()) {
                for (AggregateWindow aggregateWindow : it.next().processWatermark(j)) {
                    if (aggregateWindow.hasValue()) {
                        System.out.println("AggregateWindow: " + aggregateWindow);
                        arrayList.addAll(aggregateWindow.getAggValues());
                    }
                }
            }
            this.lastWatermark = j;
        }
    }

    public KeyedScottyWindowOperator addWindow(Window window) {
        this.windows.add(window);
        return this;
    }
}
