package io.datakernel.stream.processor;

import io.datakernel.annotation.Nullable;
import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.bytebuf.ByteBufPool;
import io.datakernel.serializer.BufferSerializer;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamProducer;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.StreamStatus;
import io.datakernel.util.MemSize;
import io.datakernel.util.Preconditions;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/stream/processor/StreamBinarySerializer.class */
public final class StreamBinarySerializer<T> implements StreamTransformer<T, ByteBuf> {
    private static final ArrayIndexOutOfBoundsException OUT_OF_BOUNDS_EXCEPTION = new ArrayIndexOutOfBoundsException();
    public static final MemSize DEFAULT_INITIAL_BUFFER_SIZE = MemSize.kilobytes(16);
    public static final MemSize MAX_SIZE_1 = MemSize.bytes(128);
    public static final MemSize MAX_SIZE_2 = MemSize.kilobytes(16);
    public static final MemSize MAX_SIZE_3 = MemSize.megabytes(2);
    public static final MemSize MAX_SIZE = MAX_SIZE_3;
    private final BufferSerializer<T> serializer;
    private Duration autoFlushInterval;
    private StreamBinarySerializer<T>.Input input;
    private StreamBinarySerializer<T>.Output output;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private MemSize initialBufferSize = DEFAULT_INITIAL_BUFFER_SIZE;
    private MemSize maxMessageSize = MAX_SIZE;
    private boolean skipSerializationErrors = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/datakernel/stream/processor/StreamBinarySerializer$Input.class */
    public final class Input extends AbstractStreamConsumer<T> {
        private Input() {
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onEndOfStream() {
            StreamBinarySerializer.this.output.flushAndClose();
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onError(Throwable th) {
            StreamBinarySerializer.this.output.closeWithError(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/datakernel/stream/processor/StreamBinarySerializer$Output.class */
    public final class Output extends AbstractStreamProducer<ByteBuf> implements StreamDataReceiver<T> {
        private final BufferSerializer<T> serializer;
        private final int initialBufferSize;
        private final int maxMessageSize;
        private final int headerSize;
        private ByteBuf outputBuf = ByteBuf.empty();
        private int estimatedMessageSize = 1;
        private final int autoFlushIntervalMillis;
        private boolean flushPosted;
        private final boolean skipSerializationErrors;
        static final /* synthetic */ boolean $assertionsDisabled;

        public Output(BufferSerializer<T> bufferSerializer, int i, int i2, @Nullable Duration duration, boolean z) {
            this.skipSerializationErrors = z;
            this.serializer = (BufferSerializer) Preconditions.checkNotNull(bufferSerializer);
            this.maxMessageSize = i2;
            this.headerSize = StreamBinarySerializer.varint32Size(i2 - 1);
            this.initialBufferSize = i;
            this.autoFlushIntervalMillis = duration == null ? -1 : (int) duration.toMillis();
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void onSuspended() {
            StreamBinarySerializer.this.input.getProducer().suspend();
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void produce() {
            if (StreamBinarySerializer.this.input.getStatus() != StreamStatus.END_OF_STREAM) {
                StreamBinarySerializer.this.input.getProducer().produce(this);
            } else {
                flushAndClose();
            }
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void onError(Throwable th) {
            StreamBinarySerializer.this.input.closeWithError(th);
        }

        private ByteBuf allocateBuffer() {
            return ByteBufPool.allocate(Math.max(this.initialBufferSize, this.headerSize + this.estimatedMessageSize + (this.estimatedMessageSize >>> 2)));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void flush() {
            if (this.outputBuf.canRead()) {
                getLastDataReceiver().onData(this.outputBuf);
                this.estimatedMessageSize -= this.estimatedMessageSize >>> 8;
            } else {
                this.outputBuf.recycle();
            }
            this.outputBuf = ByteBuf.empty();
        }

        @Override // io.datakernel.stream.StreamDataReceiver
        public void onData(T t) {
            int writePosition;
            int i;
            while (true) {
                if (this.outputBuf.writeRemaining() < this.headerSize + this.estimatedMessageSize + (this.estimatedMessageSize >>> 2)) {
                    onFullBuffer();
                }
                writePosition = this.outputBuf.writePosition();
                i = writePosition + this.headerSize;
                this.outputBuf.writePosition(i);
                try {
                    this.serializer.serialize(this.outputBuf, t);
                    break;
                } catch (ArrayIndexOutOfBoundsException e) {
                    onUnderEstimate(t, writePosition);
                } catch (Exception e2) {
                    onSerializationError(t, writePosition, e2);
                    return;
                }
            }
            int writePosition2 = this.outputBuf.writePosition() - i;
            if (writePosition2 > this.estimatedMessageSize) {
                if (writePosition2 >= this.maxMessageSize) {
                    onMessageOverflow(t, writePosition, writePosition2);
                    return;
                }
                this.estimatedMessageSize = writePosition2;
            }
            writeSize(this.outputBuf.array(), writePosition, writePosition2);
        }

        private void writeSize(byte[] bArr, int i, int i2) {
            if (this.headerSize == 1) {
                bArr[i] = (byte) i2;
                return;
            }
            bArr[i] = (byte) ((i2 & 127) | 128);
            int i3 = i2 >>> 7;
            if (this.headerSize == 2) {
                bArr[i + 1] = (byte) i3;
            } else {
                if (!$assertionsDisabled && this.headerSize != 3) {
                    throw new AssertionError();
                }
                bArr[i + 1] = (byte) ((i3 & 127) | 128);
                bArr[i + 2] = (byte) (i3 >>> 7);
            }
        }

        private void onFullBuffer() {
            flush();
            this.outputBuf = allocateBuffer();
            if (this.flushPosted) {
                return;
            }
            postFlush();
        }

        private void onSerializationError(T t, int i, Exception exc) {
            this.outputBuf.writePosition(i);
            handleSerializationError(exc);
        }

        private void onMessageOverflow(T t, int i, int i2) {
            this.outputBuf.writePosition(i);
            handleSerializationError(StreamBinarySerializer.OUT_OF_BOUNDS_EXCEPTION);
        }

        private void onUnderEstimate(T t, int i) {
            this.outputBuf.writePosition(i);
            int writeRemaining = this.outputBuf.writeRemaining();
            flush();
            this.outputBuf = ByteBufPool.allocate(Math.max(this.initialBufferSize, writeRemaining + (writeRemaining >>> 1) + 1));
        }

        private void handleSerializationError(Exception exc) {
            if (this.skipSerializationErrors) {
                StreamBinarySerializer.this.logger.warn("Skipping serialization error in {}", this, exc);
            } else {
                closeWithError(exc);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void flushAndClose() {
            flush();
            StreamBinarySerializer.this.output.sendEndOfStream();
        }

        private void postFlush() {
            this.flushPosted = true;
            if (this.autoFlushIntervalMillis == -1) {
                return;
            }
            if (this.autoFlushIntervalMillis == 0) {
                this.eventloop.postLater(() -> {
                    this.flushPosted = false;
                    flush();
                });
            } else {
                this.eventloop.delayBackground(this.autoFlushIntervalMillis, () -> {
                    this.flushPosted = false;
                    flush();
                });
            }
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void cleanup() {
            if (this.outputBuf != null) {
                this.outputBuf.recycle();
                this.outputBuf = ByteBuf.empty();
            }
        }

        static {
            $assertionsDisabled = !StreamBinarySerializer.class.desiredAssertionStatus();
        }
    }

    private StreamBinarySerializer(BufferSerializer<T> bufferSerializer) {
        this.serializer = bufferSerializer;
        rebuild();
    }

    private void rebuild() {
        if (this.output != null && ((Output) this.output).outputBuf != null) {
            ((Output) this.output).outputBuf.recycle();
        }
        this.input = new Input();
        this.output = new Output(this.serializer, this.initialBufferSize.toInt(), this.maxMessageSize.toInt(), this.autoFlushInterval, this.skipSerializationErrors);
    }

    public static <T> StreamBinarySerializer<T> create(BufferSerializer<T> bufferSerializer) {
        return new StreamBinarySerializer<>(bufferSerializer);
    }

    public StreamBinarySerializer<T> withInitialBufferSize(MemSize memSize) {
        this.initialBufferSize = memSize;
        rebuild();
        return this;
    }

    public StreamBinarySerializer<T> withMaxMessageSize(MemSize memSize) {
        this.maxMessageSize = memSize;
        rebuild();
        return this;
    }

    public StreamBinarySerializer<T> withAutoFlushInterval(@Nullable Duration duration) {
        this.autoFlushInterval = duration;
        rebuild();
        return this;
    }

    public StreamBinarySerializer<T> withSkipSerializationErrors() {
        return withSkipSerializationErrors(true);
    }

    public StreamBinarySerializer<T> withSkipSerializationErrors(boolean z) {
        this.skipSerializationErrors = z;
        rebuild();
        return this;
    }

    @Override // io.datakernel.stream.HasInput
    public StreamConsumer<T> getInput() {
        return this.input;
    }

    @Override // io.datakernel.stream.HasOutput
    public StreamProducer<ByteBuf> getOutput() {
        return this.output;
    }

    public void flush() {
        if (!this.output.getStatus().isOpen() || this.output.getLastDataReceiver() == null) {
            return;
        }
        this.output.flush();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static int varint32Size(int i) {
        if ((i & (-128)) == 0) {
            return 1;
        }
        if ((i & (-16384)) == 0) {
            return 2;
        }
        if ((i & (-2097152)) == 0) {
            return 3;
        }
        return (i & (-268435456)) == 0 ? 4 : 5;
    }
}
