package org.apache.pulsar.io.core;

import java.util.concurrent.LinkedBlockingQueue;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.functions.api.Record;

@InterfaceAudience.Public
@InterfaceStability.Evolving
/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-core-2.8.0.3.jar:org/apache/pulsar/io/core/BatchPushSource.class */
public abstract class BatchPushSource<T> implements BatchSource<T> {
    private static final int DEFAULT_QUEUE_LENGTH = 1000;
    private final NullRecord nullRecord = new NullRecord();
    private LinkedBlockingQueue<Record<T>> queue = new LinkedBlockingQueue<>(getQueueLength());

    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-core-2.8.0.3.jar:org/apache/pulsar/io/core/BatchPushSource$ErrorNotifierRecord.class */
    private static class ErrorNotifierRecord implements Record {
        private Exception e;

        public ErrorNotifierRecord(Exception exc) {
            this.e = exc;
        }

        @Override // org.apache.pulsar.functions.api.Record
        public Object getValue() {
            return null;
        }

        public Exception getException() {
            return this.e;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-core-2.8.0.3.jar:org/apache/pulsar/io/core/BatchPushSource$NullRecord.class */
    public static class NullRecord implements Record {
        private NullRecord() {
        }

        @Override // org.apache.pulsar.functions.api.Record
        public Object getValue() {
            return null;
        }
    }

    @Override // org.apache.pulsar.io.core.BatchSource
    public Record<T> readNext() throws Exception {
        Record<T> take = this.queue.take();
        if (take instanceof ErrorNotifierRecord) {
            throw ((ErrorNotifierRecord) take).getException();
        }
        if (take instanceof NullRecord) {
            return null;
        }
        return take;
    }

    public void consume(Record<T> record) {
        try {
            if (record != null) {
                this.queue.put(record);
            } else {
                this.queue.put(this.nullRecord);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public int getQueueLength() {
        return DEFAULT_QUEUE_LENGTH;
    }

    public void notifyError(Exception exc) {
        consume(new ErrorNotifierRecord(exc));
    }
}
