package io.activej.datastream.processor;

import io.activej.common.ApplicationSettings;
import io.activej.common.Checks;
import io.activej.common.initializer.WithInitializer;
import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.AbstractStreamSupplier;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.promise.Promise;
import java.util.Objects;

/* loaded from: input_file:io/activej/datastream/processor/StreamBuffer.class */
public final class StreamBuffer<T> implements StreamTransformer<T, T>, WithInitializer<StreamBuffer<T>> {
    private static final boolean CHECK = Checks.isEnabled(StreamBuffer.class);
    private static final boolean NULLIFY_ON_TAKE_OUT = ApplicationSettings.getBoolean(StreamBuffer.class, "nullifyOnTakeOut", true);
    private final StreamBuffer<T>.Input input;
    private final StreamBuffer<T>.Output output;
    private final Object[] elements;
    private int tail;
    private int head;
    private final int bufferMinSize;
    private final int bufferMaxSize;
    private final StreamDataAcceptor<T> toBuffer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/activej/datastream/processor/StreamBuffer$Input.class */
    public final class Input extends AbstractStreamConsumer<T> {
        private Input() {
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onStarted() {
            StreamBuffer.this.sync();
        }

        @Override // io.activej.datastream.AbstractStreamConsumer
        protected void onEndOfStream() {
            StreamBuffer.this.output.flush();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/activej/datastream/processor/StreamBuffer$Output.class */
    public final class Output extends AbstractStreamSupplier<T> {
        private Output() {
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onResumed() {
            flush();
        }

        /* JADX WARN: Multi-variable type inference failed */
        void flush() {
            int i = StreamBuffer.this.head;
            int i2 = StreamBuffer.this.tail;
            while (true) {
                StreamDataAcceptor<T> dataAcceptor = getDataAcceptor();
                if (dataAcceptor != 0 && i != i2) {
                    int i3 = i;
                    i++;
                    int length = i3 & (StreamBuffer.this.elements.length - 1);
                    Object obj = StreamBuffer.this.elements[length];
                    if (StreamBuffer.NULLIFY_ON_TAKE_OUT) {
                        StreamBuffer.this.elements[length] = null;
                    }
                    dataAcceptor.accept(obj);
                }
            }
            if (StreamBuffer.CHECK) {
                Checks.checkState(i2 == StreamBuffer.this.tail, "New items have been added to buffer while flushing");
            }
            StreamBuffer.this.head = i;
            if (StreamBuffer.this.isEmpty() && StreamBuffer.this.input.isEndOfStream()) {
                sendEndOfStream();
            }
            StreamBuffer.this.sync();
        }

        @Override // io.activej.datastream.AbstractStreamSupplier
        protected void onSuspended() {
            StreamBuffer.this.sync();
        }
    }

    private StreamBuffer(int i, int i2) {
        Checks.checkArgument(i2 > 0 && i >= 0);
        this.bufferMinSize = i;
        this.bufferMaxSize = i2;
        this.elements = new Object[1 << (32 - Integer.numberOfLeadingZeros(this.bufferMaxSize - 1))];
        this.input = new Input();
        this.output = new Output();
        this.toBuffer = obj -> {
            doAdd(obj);
            if (size() >= i2) {
                this.input.suspend();
                this.output.flush();
            }
        };
        Promise<Void> acknowledgement = this.input.getAcknowledgement();
        StreamBuffer<T>.Output output = this.output;
        Objects.requireNonNull(output);
        acknowledgement.whenException(output::closeEx);
        Promise<Void> acknowledgement2 = this.output.getAcknowledgement();
        StreamBuffer<T>.Input input = this.input;
        Objects.requireNonNull(input);
        Promise whenResult = acknowledgement2.whenResult(input::acknowledge);
        StreamBuffer<T>.Input input2 = this.input;
        Objects.requireNonNull(input2);
        whenResult.whenException(input2::closeEx);
    }

    public static <T> StreamBuffer<T> create(int i, int i2) {
        return new StreamBuffer<>(i, i2);
    }

    public boolean isSaturated() {
        return size() >= this.bufferMaxSize;
    }

    public boolean isExhausted() {
        return size() <= this.bufferMinSize;
    }

    public boolean isEmpty() {
        return this.tail == this.head;
    }

    public int size() {
        return this.tail - this.head;
    }

    private void doAdd(T t) {
        Object[] objArr = this.elements;
        int i = this.tail;
        this.tail = i + 1;
        objArr[i & (this.elements.length - 1)] = t;
    }

    @Override // io.activej.datastream.dsl.HasStreamInput
    public StreamConsumer<T> getInput() {
        return this.input;
    }

    @Override // io.activej.datastream.dsl.HasStreamOutput
    public StreamSupplier<T> getOutput() {
        return this.output;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sync() {
        if (size() >= this.bufferMaxSize) {
            this.input.suspend();
            return;
        }
        if (size() <= this.bufferMinSize) {
            if (isEmpty() && this.output.isReady()) {
                this.input.resume(this.output.getDataAcceptor());
            } else {
                this.input.resume(this.toBuffer);
            }
        }
    }
}
