package io.streamthoughts.azkarra.api.events;

import io.streamthoughts.azkarra.api.model.KV;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:io/streamthoughts/azkarra/api/events/EventStream.class */
public class EventStream<K, V> {
    private final String type;
    private final BlockingRecordQueue<K, V> queue;
    private final AtomicBoolean opened = new AtomicBoolean(false);

    /* loaded from: input_file:io/streamthoughts/azkarra/api/events/EventStream$Builder.class */
    public static class Builder {
        private String eventType;
        private Integer queueSize;
        private LimitHandler queueLimitHandler;
        private Duration maxBlockingTime;

        public Builder(String str) {
            this.eventType = str;
        }

        public Builder withQueueSize(int i) {
            this.queueSize = Integer.valueOf(i);
            return this;
        }

        public Builder withMaxBlockingTime(Duration duration) {
            this.maxBlockingTime = duration;
            return this;
        }

        public Builder withQueueLimitHandler(LimitHandler limitHandler) {
            this.queueLimitHandler = limitHandler;
            return this;
        }

        public <K, V> EventStream<K, V> build() {
            return new EventStream<>(this.eventType, new BasicBlockingRecordQueue(((Integer) Optional.ofNullable(this.queueSize).orElse(10000)).intValue(), (Duration) Optional.ofNullable(this.maxBlockingTime).orElse(BasicBlockingRecordQueue.DEFAULT_MAX_BLOCK_DURATION), (LimitHandler) Optional.ofNullable(this.queueLimitHandler).orElse(LimitHandlers.NO_OP)));
        }
    }

    public EventStream(String str, BlockingRecordQueue<K, V> blockingRecordQueue) {
        this.type = (String) Objects.requireNonNull(str, "eventType cannot be null");
        this.queue = (BlockingRecordQueue) Objects.requireNonNull(blockingRecordQueue, "queue cannot be null");
    }

    public String type() {
        return this.type;
    }

    public synchronized void open(EventStreamPipe<K, V> eventStreamPipe) {
        Objects.requireNonNull(eventStreamPipe, "pipe cannot be null");
        if (this.opened.get()) {
            throw new IllegalStateException("This stream is already open");
        }
        eventStreamPipe.onOpen(this.queue);
        this.queue.open();
    }

    public synchronized void close() {
        try {
            this.queue.close();
            this.queue.clear();
        } finally {
            this.opened.set(false);
        }
    }

    public void send(V v) {
        send((KV) KV.of(null, v));
    }

    public void send(K k, V v) {
        send((KV) KV.of(k, v));
    }

    public void send(K k, V v, long j) {
        send((KV) KV.of(k, v, Long.valueOf(j)));
    }

    public void send(KV<K, V> kv) {
        this.queue.send(kv);
    }

    public String toString() {
        return "EventStream{type='" + this.type + "\"}";
    }
}
