package org.apache.pulsar.functions.windowing.triggers;

import org.apache.pulsar.functions.windowing.DefaultEvictionContext;
import org.apache.pulsar.functions.windowing.Event;
import org.apache.pulsar.functions.windowing.EvictionPolicy;
import org.apache.pulsar.functions.windowing.TriggerHandler;
import org.apache.pulsar.functions.windowing.TriggerPolicy;
import org.apache.pulsar.functions.windowing.WindowManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.10.0-rc-202203202207.jar:org/apache/pulsar/functions/windowing/triggers/WatermarkTimeTriggerPolicy.class */
public class WatermarkTimeTriggerPolicy<T> implements TriggerPolicy<T, Long> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) WatermarkTimeTriggerPolicy.class);
    private final long slidingIntervalMs;
    private final TriggerHandler handler;
    private final EvictionPolicy<T, ?> evictionPolicy;
    private final WindowManager<T> windowManager;
    private volatile long nextWindowEndTs;
    private boolean started = false;

    public WatermarkTimeTriggerPolicy(long j, TriggerHandler triggerHandler, EvictionPolicy<T, ?> evictionPolicy, WindowManager<T> windowManager) {
        this.slidingIntervalMs = j;
        this.handler = triggerHandler;
        this.evictionPolicy = evictionPolicy;
        this.windowManager = windowManager;
    }

    @Override // org.apache.pulsar.functions.windowing.TriggerPolicy
    public void track(Event<T> event) {
        if (this.started && event.isWatermark()) {
            handleWaterMarkEvent(event);
        }
    }

    @Override // org.apache.pulsar.functions.windowing.TriggerPolicy
    public void reset() {
    }

    @Override // org.apache.pulsar.functions.windowing.TriggerPolicy
    public void start() {
        this.started = true;
    }

    @Override // org.apache.pulsar.functions.windowing.TriggerPolicy
    public void shutdown() {
    }

    private void handleWaterMarkEvent(Event<T> event) {
        long timestamp = event.getTimestamp();
        long j = this.nextWindowEndTs;
        if (log.isDebugEnabled()) {
            log.debug("Window end ts {} Watermark ts {}", Long.valueOf(j), Long.valueOf(timestamp));
        }
        while (true) {
            if (j > timestamp) {
                break;
            }
            this.evictionPolicy.setContext(new DefaultEvictionContext(Long.valueOf(j), Long.valueOf(this.windowManager.getEventCount(j))));
            if (this.handler.onTrigger()) {
                j += this.slidingIntervalMs;
            } else {
                long nextAlignedWindowTs = getNextAlignedWindowTs(j, timestamp);
                if (log.isDebugEnabled()) {
                    log.debug("Next aligned window end ts {}", Long.valueOf(nextAlignedWindowTs));
                }
                if (nextAlignedWindowTs != Long.MAX_VALUE) {
                    j = nextAlignedWindowTs;
                } else if (log.isDebugEnabled()) {
                    log.debug("No events to process between {} and watermark ts {}", Long.valueOf(j), Long.valueOf(timestamp));
                }
            }
        }
        this.nextWindowEndTs = j;
    }

    private long getNextAlignedWindowTs(long j, long j2) {
        long earliestEventTs = this.windowManager.getEarliestEventTs(j, j2);
        return (earliestEventTs == Long.MAX_VALUE || earliestEventTs % this.slidingIntervalMs == 0) ? earliestEventTs : earliestEventTs + (this.slidingIntervalMs - (earliestEventTs % this.slidingIntervalMs));
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.pulsar.functions.windowing.TriggerPolicy
    public Long getState() {
        return Long.valueOf(this.nextWindowEndTs);
    }

    @Override // org.apache.pulsar.functions.windowing.TriggerPolicy
    public void restoreState(Long l) {
        this.nextWindowEndTs = l.longValue();
    }

    public String toString() {
        return "WatermarkTimeTriggerPolicy{slidingIntervalMs=" + this.slidingIntervalMs + ", nextWindowEndTs=" + this.nextWindowEndTs + ", started=" + this.started + '}';
    }
}
