package com.hazelcast.jet.pipeline.test;

import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.AppendableTraverser;
import com.hazelcast.jet.core.EventTimeMapper;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.logging.ILogger;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;

/* loaded from: input_file:BOOT-INF/lib/hazelcast-5.5.0.jar:com/hazelcast/jet/pipeline/test/LongStreamSourceP.class */
public class LongStreamSourceP extends AbstractProcessor {
    private static final long SOURCE_THROUGHPUT_REPORTING_PERIOD_SECONDS = 5;
    private static final long HICCUP_REPORT_THRESHOLD_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
    private static final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);
    private final long eventsPerSecond;
    private final EventTimeMapper<? super Long> eventTimeMapper;
    private long startNanoTime;
    private long globalProcessorIndex;
    private long totalParallelism;
    private long lastReportNanos;
    private long valueAtLastReport;
    private long lastCallNanos;
    private long valueToEmit;
    private long nowNanoTime;
    private ILogger logger;
    private final long nanoTimeMillisToCurrentTimeMillis = determineTimeOffset();
    private Traverser<Object> traverser = new AppendableTraverser(2);

    /* JADX INFO: Access modifiers changed from: package-private */
    public LongStreamSourceP(long j, long j2, EventTimePolicy<? super Long> eventTimePolicy) {
        this.startNanoTime = j;
        this.eventsPerSecond = j2;
        this.eventTimeMapper = new EventTimeMapper<>(eventTimePolicy);
        this.eventTimeMapper.addPartitions(1);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.hazelcast.jet.core.AbstractProcessor
    public void init(@Nonnull Processor.Context context) {
        this.logger = context.logger();
        this.totalParallelism = context.totalParallelism();
        this.globalProcessorIndex = context.globalProcessorIndex();
        this.valueToEmit = this.globalProcessorIndex;
        this.startNanoTime = TimeUnit.MILLISECONDS.toNanos(this.startNanoTime + this.nanoTimeMillisToCurrentTimeMillis) + ((this.valueToEmit * NANOS_PER_SECOND) / this.eventsPerSecond);
        long j = this.startNanoTime;
        this.lastReportNanos = j;
        this.lastCallNanos = j;
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean complete() {
        this.nowNanoTime = System.nanoTime();
        emitEvents();
        detectAndReportHiccup();
        if (!this.logger.isFineEnabled()) {
            return false;
        }
        reportThroughput();
        return false;
    }

    private void emitEvents() {
        long j = ((this.nowNanoTime - this.startNanoTime) * this.eventsPerSecond) / NANOS_PER_SECOND;
        while (emitFromTraverser(this.traverser) && this.valueToEmit < j) {
            this.traverser = this.eventTimeMapper.flatMapEvent(this.nowNanoTime, Long.valueOf(this.valueToEmit), 0, TimeUnit.NANOSECONDS.toMillis(this.startNanoTime + ((this.valueToEmit * NANOS_PER_SECOND) / this.eventsPerSecond)) - this.nanoTimeMillisToCurrentTimeMillis);
            this.valueToEmit += this.totalParallelism;
        }
    }

    private void detectAndReportHiccup() {
        if (this.nowNanoTime - this.lastCallNanos > HICCUP_REPORT_THRESHOLD_NANOS) {
            this.logger.info(String.format("*** Source #%d hiccup: %,d ms%n", Long.valueOf(this.globalProcessorIndex), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(this.nowNanoTime - this.lastCallNanos))));
        }
        this.lastCallNanos = this.nowNanoTime;
    }

    private void reportThroughput() {
        long j = this.nowNanoTime - this.lastReportNanos;
        if (j < TimeUnit.SECONDS.toNanos(SOURCE_THROUGHPUT_REPORTING_PERIOD_SECONDS)) {
            return;
        }
        this.lastReportNanos = this.nowNanoTime;
        long j2 = (this.valueToEmit - this.valueAtLastReport) / this.totalParallelism;
        this.valueAtLastReport = this.valueToEmit;
        this.logger.fine(String.format("p%d: %,.0f items/second", Long.valueOf(this.globalProcessorIndex), Double.valueOf(j2 / (j / NANOS_PER_SECOND))));
    }

    private static long determineTimeOffset() {
        return TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - System.currentTimeMillis();
    }
}
