package io.activej.http.stream;

import io.activej.bytebuf.ByteBuf;
import io.activej.bytebuf.ByteBufPool;
import io.activej.common.Preconditions;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.ChannelInput;
import io.activej.csp.ChannelOutput;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.dsl.WithChannelTransformer;
import io.activej.csp.process.AbstractCommunicatingProcess;

/* loaded from: input_file:io/activej/http/stream/BufsConsumerChunkedEncoder.class */
public final class BufsConsumerChunkedEncoder extends AbstractCommunicatingProcess implements WithChannelTransformer<BufsConsumerChunkedEncoder, ByteBuf, ByteBuf> {
    private static final byte[] LAST_CHUNK_BYTES = {48, 13, 10, 13, 10};
    private ChannelSupplier<ByteBuf> input;
    private ChannelConsumer<ByteBuf> output;

    private BufsConsumerChunkedEncoder() {
    }

    public static BufsConsumerChunkedEncoder create() {
        return new BufsConsumerChunkedEncoder();
    }

    public ChannelInput<ByteBuf> getInput() {
        return channelSupplier -> {
            Preconditions.checkState(this.input == null, "Input already set");
            this.input = sanitize(channelSupplier);
            if (this.input != null && this.output != null) {
                startProcess();
            }
            return getProcessCompletion();
        };
    }

    public ChannelOutput<ByteBuf> getOutput() {
        return channelConsumer -> {
            Preconditions.checkState(this.output == null, "Output already set");
            this.output = sanitize(channelConsumer);
            if (this.input == null || this.output == null) {
                return;
            }
            startProcess();
        };
    }

    protected void beforeProcess() {
        Preconditions.checkState(this.input != null, "Input was not set");
        Preconditions.checkState(this.output != null, "Output was not set");
    }

    protected void doProcess() {
        this.input.filter((v0) -> {
            return v0.canRead();
        }).streamTo(ChannelConsumer.of(byteBuf -> {
            return this.output.accept(encodeBuf(byteBuf));
        })).then(() -> {
            return this.output.accept(ByteBuf.wrapForReading(LAST_CHUNK_BYTES));
        }).then(() -> {
            return this.output.acceptEndOfStream();
        }).whenResult(this::completeProcess);
    }

    private static ByteBuf encodeBuf(ByteBuf byteBuf) {
        int readRemaining = byteBuf.readRemaining();
        char[] charArray = Integer.toHexString(readRemaining).toCharArray();
        int length = charArray.length;
        ByteBuf allocate = ByteBufPool.allocate(length + 2 + readRemaining + 2);
        byte[] array = allocate.array();
        for (int i = 0; i < length; i++) {
            array[i] = (byte) charArray[i];
        }
        array[length] = 13;
        array[length + 1] = 10;
        allocate.tail(length + 2);
        allocate.put(byteBuf);
        byteBuf.recycle();
        allocate.writeByte((byte) 13);
        allocate.writeByte((byte) 10);
        return allocate;
    }

    protected void doClose(Throwable th) {
        this.input.closeEx(th);
        this.output.closeEx(th);
    }
}
