package io.pravega.connectors.flink;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

/* loaded from: input_file:io/pravega/connectors/flink/EventTimeOrderingFunction.class */
public class EventTimeOrderingFunction<T> extends KeyedProcessFunction<String, T, T> {
    private static final long serialVersionUID = 1;
    private static final String EVENT_QUEUE_STATE_NAME = "eventQueue";
    private static final String LAST_TRIGGERING_TS_STATE_NAME = "lastTriggeringTsState";
    private final TypeInformation<T> typeInformation;
    private transient MapState<Long, List<T>> dataState;
    private transient ValueState<Long> lastTriggeringTsState;

    public EventTimeOrderingFunction(TypeInformation<T> typeInformation) {
        this.typeInformation = typeInformation;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.dataState = getRuntimeContext().getMapState(new MapStateDescriptor(EVENT_QUEUE_STATE_NAME, BasicTypeInfo.LONG_TYPE_INFO, new ListTypeInfo(this.typeInformation)));
        this.lastTriggeringTsState = getRuntimeContext().getState(new ValueStateDescriptor(LAST_TRIGGERING_TS_STATE_NAME, Long.class));
    }

    public void processElement(T t, KeyedProcessFunction<String, T, T>.Context context, Collector<T> collector) throws Exception {
        Long timestamp = context.timestamp();
        if (timestamp == null) {
            collector.collect(t);
            return;
        }
        Long l = (Long) this.lastTriggeringTsState.value();
        if (l == null || timestamp.longValue() > l.longValue()) {
            List list = (List) this.dataState.get(timestamp);
            if (list == null) {
                list = new ArrayList(1);
                context.timerService().registerEventTimeTimer(timestamp.longValue());
            }
            list.add(t);
            this.dataState.put(timestamp, list);
        }
    }

    public void onTimer(long j, KeyedProcessFunction<String, T, T>.OnTimerContext onTimerContext, Collector<T> collector) throws Exception {
        List list = (List) this.dataState.get(Long.valueOf(j));
        if (list != null) {
            Objects.requireNonNull(collector);
            list.forEach(collector::collect);
            this.dataState.remove(Long.valueOf(j));
            this.lastTriggeringTsState.update(Long.valueOf(j));
        }
    }
}
