package io.streamthoughts.azkarra.api.events.callback;

import io.streamthoughts.azkarra.api.events.BlockingRecordQueue;
import io.streamthoughts.azkarra.api.events.LimitHandler;
import io.streamthoughts.azkarra.api.events.LimitHandlers;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

/* loaded from: input_file:io/streamthoughts/azkarra/api/events/callback/LimitedQueueCallback.class */
public class LimitedQueueCallback implements LimitQueueCallback {
    private final AtomicInteger remaining;
    private volatile LimitHandler limitHandler = LimitHandlers.NO_OP;
    private volatile BlockingRecordQueue<?, ?> queue;
    private final int limit;

    public LimitedQueueCallback(int i) {
        if (i < 0) {
            throw new IllegalArgumentException("limit must be positive, given: " + i);
        }
        this.limit = i;
        this.remaining = new AtomicInteger(i);
    }

    @Override // io.streamthoughts.azkarra.api.events.callback.LimitQueueCallback
    public void setLimitHandler(LimitHandler limitHandler) {
        this.limitHandler = (LimitHandler) Objects.requireNonNull(limitHandler, "limitHandler");
        if (this.remaining.get() == 0) {
            limitHandler.onLimitReached(this.queue);
        }
    }

    @Override // io.streamthoughts.azkarra.api.events.callback.LimitQueueCallback
    public void setQueue(BlockingRecordQueue blockingRecordQueue) {
        this.queue = blockingRecordQueue;
    }

    @Override // io.streamthoughts.azkarra.api.events.callback.QueueCallback
    public void onQueued() {
        if (this.remaining.decrementAndGet() == 0) {
            this.limitHandler.onLimitReached(this.queue);
        }
    }

    @Override // io.streamthoughts.azkarra.api.events.callback.QueueCallback
    public void onClosed() {
        this.remaining.set(this.limit);
    }
}
