package io.activej.datastream.csp;

import io.activej.bytebuf.ByteBuf;
import io.activej.bytebuf.ByteBufPool;
import io.activej.common.Checks;
import io.activej.common.MemSize;
import io.activej.common.Utils;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.ChannelOutput;
import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.promise.Promise;
import io.activej.serializer.BinarySerializer;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Objects;
import java.util.function.BiConsumer;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/datastream/csp/ChannelSerializer.class */
public final class ChannelSerializer<T> extends AbstractStreamConsumer<T> implements WithStreamToChannel<ChannelSerializer<T>, T, ByteBuf> {
    private static final int MAX_SIZE_INT = 268435456;
    private final BinarySerializer<T> serializer;

    @Nullable
    private byte[] explicitEndOfStream;

    @Nullable
    private Duration autoFlushInterval;
    private ChannelSerializer<T>.Input input;
    private ChannelConsumer<ByteBuf> output;
    private boolean sending;
    private static final Logger logger = LoggerFactory.getLogger(ChannelSerializer.class);
    private static final boolean CHECK = Checks.isEnabled(ChannelSerializer.class);
    public static final MemSize DEFAULT_INITIAL_BUFFER_SIZE = MemSize.kilobytes(16);
    private MemSize initialBufferSize = DEFAULT_INITIAL_BUFFER_SIZE;
    private BiConsumer<T, Throwable> serializationErrorHandler = (obj, th) -> {
        closeEx(th);
    };
    private final ArrayDeque<ByteBuf> bufs = new ArrayDeque<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/activej/datastream/csp/ChannelSerializer$Input.class */
    public final class Input implements StreamDataAcceptor<T> {
        private final BinarySerializer<T> serializer;
        private ByteBuf buf = null;
        private int estimatedDataSize;
        private int estimatedHeaderSize;
        private int requiredRemainingSize;
        private final int initialBufferSize;
        private final int autoFlushIntervalMillis;
        private boolean flushPosted;
        private final BiConsumer<T, Throwable> serializationErrorHandler;
        static final /* synthetic */ boolean $assertionsDisabled;

        public Input(@NotNull BinarySerializer<T> binarySerializer, int i, BiConsumer<T, Throwable> biConsumer) {
            this.serializer = binarySerializer;
            this.initialBufferSize = i;
            this.autoFlushIntervalMillis = ChannelSerializer.this.autoFlushInterval == null ? Integer.MAX_VALUE : (int) ChannelSerializer.this.autoFlushInterval.toMillis();
            this.serializationErrorHandler = biConsumer;
        }

        @Override // io.activej.datastream.StreamDataAcceptor
        public void accept(T t) {
            int tail;
            int i;
            int encode;
            while (true) {
                if (this.buf == null || this.buf.writeRemaining() < this.requiredRemainingSize) {
                    ensureBuffer();
                }
                tail = this.buf.tail();
                i = tail + this.estimatedHeaderSize;
                try {
                    encode = this.serializer.encode(this.buf.array(), i, t);
                    break;
                } catch (ArrayIndexOutOfBoundsException e) {
                    enlargeBuffer();
                } catch (Exception e2) {
                    onSerializationError(t, e2);
                    return;
                }
            }
            this.buf.tail(encode);
            int i2 = encode - i;
            if (i2 > this.estimatedDataSize) {
                reestimate(tail, i, i2);
            }
            writeSize(this.buf.array(), tail, i2);
        }

        private void writeSize(byte[] bArr, int i, int i2) {
            if (this.estimatedHeaderSize == 1) {
                bArr[i] = (byte) i2;
                return;
            }
            bArr[i] = (byte) ((i2 & 127) | 128);
            int i3 = i2 >>> 7;
            if (this.estimatedHeaderSize == 2) {
                bArr[i + 1] = (byte) i3;
                return;
            }
            bArr[i + 1] = (byte) ((i3 & 127) | 128);
            int i4 = i3 >>> 7;
            if (this.estimatedHeaderSize == 3) {
                bArr[i + 2] = (byte) i4;
            } else {
                if (!$assertionsDisabled && this.estimatedHeaderSize != 4) {
                    throw new AssertionError();
                }
                bArr[i + 2] = (byte) ((i4 & 127) | 128);
                bArr[i + 3] = (byte) (i4 >>> 7);
            }
        }

        private void ensureBuffer() {
            flush();
            this.buf = ByteBufPool.allocate(Math.max(this.initialBufferSize, this.requiredRemainingSize));
            if (this.flushPosted) {
                return;
            }
            postFlush();
        }

        private void enlargeBuffer() {
            int writeRemaining = this.buf.writeRemaining();
            flush();
            this.buf = ByteBufPool.allocate(Math.max(this.initialBufferSize, writeRemaining + (writeRemaining >>> 1) + 1));
        }

        private void reestimate(int i, int i2, int i3) {
            if (ChannelSerializer.CHECK) {
                Checks.checkArgument(i3 < ChannelSerializer.MAX_SIZE_INT, "Serialized data size exceeds 256MB");
            }
            this.estimatedDataSize = i3;
            this.estimatedHeaderSize = ChannelSerializer.varIntSize(this.estimatedDataSize);
            this.requiredRemainingSize = this.estimatedHeaderSize + this.estimatedDataSize + (this.estimatedDataSize >>> 2);
            ensureHeaderSize(i, i2, i3);
        }

        private void ensureHeaderSize(int i, int i2, int i3) {
            int i4 = i2 - i;
            if (i4 == this.estimatedHeaderSize) {
                return;
            }
            int i5 = this.estimatedHeaderSize - i4;
            if (!$assertionsDisabled && i5 <= 0) {
                throw new AssertionError();
            }
            int i6 = i2 + i5;
            int i7 = i6 + i3;
            if (i7 < this.buf.array().length) {
                System.arraycopy(this.buf.array(), i2, this.buf.array(), i6, i3);
            } else {
                ByteBuf byteBuf = this.buf;
                this.buf = ByteBufPool.allocate(Math.max(this.initialBufferSize, i7));
                System.arraycopy(byteBuf.array(), 0, this.buf.array(), 0, i);
                System.arraycopy(byteBuf.array(), i2, this.buf.array(), i6, i3);
                byteBuf.recycle();
            }
            this.buf.tail(i7);
        }

        private void postFlush() {
            this.flushPosted = true;
            if (this.autoFlushIntervalMillis <= 0) {
                ChannelSerializer.this.eventloop.postLast(() -> {
                    this.flushPosted = false;
                    flush();
                });
            } else if (this.autoFlushIntervalMillis < Integer.MAX_VALUE) {
                ChannelSerializer.this.eventloop.delayBackground(this.autoFlushIntervalMillis, () -> {
                    this.flushPosted = false;
                    flush();
                });
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void flush() {
            if (this.buf == null) {
                return;
            }
            if (!this.buf.canRead()) {
                this.buf.recycle();
                this.buf = null;
                return;
            }
            if (!ChannelSerializer.this.bufs.isEmpty()) {
                ChannelSerializer.this.suspend();
            }
            ChannelSerializer.this.bufs.add(this.buf);
            this.buf = null;
            ChannelSerializer.this.send();
        }

        private void onSerializationError(T t, Exception exc) {
            this.serializationErrorHandler.accept(t, exc);
        }

        static {
            $assertionsDisabled = !ChannelSerializer.class.desiredAssertionStatus();
        }
    }

    private ChannelSerializer(BinarySerializer<T> binarySerializer) {
        this.serializer = binarySerializer;
    }

    public static <T> ChannelSerializer<T> create(BinarySerializer<T> binarySerializer) {
        return new ChannelSerializer<>(binarySerializer);
    }

    public ChannelSerializer<T> withInitialBufferSize(MemSize memSize) {
        this.initialBufferSize = memSize;
        return this;
    }

    public ChannelSerializer<T> withAutoFlushInterval(@Nullable Duration duration) {
        this.autoFlushInterval = duration;
        return this;
    }

    public ChannelSerializer<T> withSkipSerializationErrors() {
        return withSerializationErrorHandler((obj, th) -> {
            logger.warn("Skipping serialization error for {} in {}", new Object[]{obj, this, th});
        });
    }

    public ChannelSerializer<T> withSerializationErrorHandler(BiConsumer<T, Throwable> biConsumer) {
        this.serializationErrorHandler = biConsumer;
        return this;
    }

    public ChannelSerializer<T> withExplicitEndOfStream() {
        return withExplicitEndOfStream(true);
    }

    public ChannelSerializer<T> withExplicitEndOfStream(boolean z) {
        return withExplicitEndOfStream(z ? new byte[]{0} : null);
    }

    public ChannelSerializer<T> withExplicitEndOfStream(@Nullable byte[] bArr) {
        this.explicitEndOfStream = bArr;
        return this;
    }

    public ChannelOutput<ByteBuf> getOutput() {
        return channelConsumer -> {
            this.output = channelConsumer;
            resume(this.input);
        };
    }

    @Override // io.activej.datastream.AbstractStreamConsumer
    protected void onInit() {
        this.input = new Input(this.serializer, this.initialBufferSize.toInt(), this.serializationErrorHandler);
    }

    @Override // io.activej.datastream.AbstractStreamConsumer
    protected void onStarted() {
        if (this.output != null) {
            resume(this.input);
        }
    }

    @Override // io.activej.datastream.AbstractStreamConsumer
    protected void onEndOfStream() {
        this.input.flush();
        send();
    }

    @Override // io.activej.datastream.AbstractStreamConsumer
    protected void onError(Throwable th) {
        this.output.closeEx(th);
    }

    @Override // io.activej.datastream.AbstractStreamConsumer
    protected void onCleanup() {
        this.bufs.forEach((v0) -> {
            v0.recycle();
        });
        this.bufs.clear();
        ((Input) this.input).buf = (ByteBuf) Utils.nullify(((Input) this.input).buf, (v0) -> {
            v0.recycle();
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void send() {
        if (this.sending) {
            return;
        }
        if (this.bufs.isEmpty()) {
            if (!isEndOfStream()) {
                resume(this.input);
                return;
            }
            this.sending = true;
            Promise then = Promise.complete().then(() -> {
                return this.explicitEndOfStream != null ? this.output.accept(ByteBuf.wrapForReading(this.explicitEndOfStream)) : Promise.complete();
            });
            ChannelConsumer<ByteBuf> channelConsumer = this.output;
            Objects.requireNonNull(channelConsumer);
            then.then(channelConsumer::acceptEndOfStream).whenResult(this::acknowledge);
            return;
        }
        this.sending = true;
        while (!this.bufs.isEmpty()) {
            Promise accept = this.output.accept(this.bufs.poll());
            if (!accept.isResult()) {
                accept.whenResult(() -> {
                    this.sending = false;
                    send();
                }).whenException(this::closeEx);
                return;
            }
        }
        this.sending = false;
        send();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static int varIntSize(int i) {
        return 1 + ((31 - Integer.numberOfLeadingZeros(i)) / 7);
    }
}
