package io.datakernel.csp.process;

import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.bytebuf.ByteBufPool;
import io.datakernel.bytebuf.ByteBufQueue;
import io.datakernel.common.inspector.BaseInspector;
import io.datakernel.common.parse.ParseException;
import io.datakernel.common.parse.TruncatedDataException;
import io.datakernel.csp.AbstractCommunicatingProcess;
import io.datakernel.csp.ChannelConsumer;
import io.datakernel.csp.ChannelInput;
import io.datakernel.csp.ChannelOutput;
import io.datakernel.csp.binary.BinaryChannelSupplier;
import io.datakernel.csp.dsl.WithBinaryChannelInput;
import io.datakernel.csp.dsl.WithChannelTransformer;
import io.datakernel.promise.Promise;
import net.jpountz.lz4.LZ4Exception;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4FastDecompressor;
import net.jpountz.util.SafeUtils;
import net.jpountz.xxhash.StreamingXXHash32;
import net.jpountz.xxhash.XXHashFactory;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/datakernel/csp/process/ChannelLZ4Decompressor.class */
public final class ChannelLZ4Decompressor extends AbstractCommunicatingProcess implements WithChannelTransformer<ChannelLZ4Decompressor, ByteBuf, ByteBuf>, WithBinaryChannelInput<ChannelLZ4Decompressor> {
    public static final int HEADER_LENGTH = ChannelLZ4Compressor.HEADER_LENGTH;
    public static final ParseException STREAM_IS_CORRUPTED = new ParseException(ChannelLZ4Decompressor.class, "Stream is corrupted");
    private final LZ4FastDecompressor decompressor;
    private final StreamingXXHash32 checksum;
    private ByteBufQueue bufs;
    private BinaryChannelSupplier input;
    private ChannelConsumer<ByteBuf> output;
    private final Header header = new Header();

    @Nullable
    private Inspector inspector;

    /* loaded from: input_file:io/datakernel/csp/process/ChannelLZ4Decompressor$Header.class */
    public static final class Header {
        public int originalLen;
        public int compressedLen;
        public int compressionMethod;
        public int check;
        public boolean finished;
    }

    /* loaded from: input_file:io/datakernel/csp/process/ChannelLZ4Decompressor$Inspector.class */
    public interface Inspector extends BaseInspector<Inspector> {
        void onBlock(ChannelLZ4Decompressor channelLZ4Decompressor, Header header, ByteBuf byteBuf, ByteBuf byteBuf2);
    }

    private ChannelLZ4Decompressor(LZ4FastDecompressor lZ4FastDecompressor, StreamingXXHash32 streamingXXHash32) {
        this.decompressor = lZ4FastDecompressor;
        this.checksum = streamingXXHash32;
    }

    public static ChannelLZ4Decompressor create() {
        return create(LZ4Factory.fastestInstance().fastDecompressor(), XXHashFactory.fastestInstance());
    }

    public static ChannelLZ4Decompressor create(LZ4FastDecompressor lZ4FastDecompressor, XXHashFactory xXHashFactory) {
        return new ChannelLZ4Decompressor(lZ4FastDecompressor, xXHashFactory.newStreamingHash32(-1756908916));
    }

    public ChannelLZ4Decompressor withInspector(Inspector inspector) {
        this.inspector = inspector;
        return this;
    }

    @Override // io.datakernel.csp.dsl.HasChannelInput
    /* renamed from: getInput */
    public ChannelInput<ByteBuf> getInput2() {
        return binaryChannelSupplier -> {
            this.input = binaryChannelSupplier;
            this.bufs = binaryChannelSupplier.getBufs();
            if (this.input != null && this.output != null) {
                startProcess();
            }
            return getProcessCompletion();
        };
    }

    @Override // io.datakernel.csp.dsl.HasChannelOutput
    public ChannelOutput<ByteBuf> getOutput() {
        return channelConsumer -> {
            this.output = sanitize(channelConsumer);
            if (this.input == null || this.output == null) {
                return;
            }
            startProcess();
        };
    }

    @Override // io.datakernel.csp.AbstractCommunicatingProcess
    protected void doProcess() {
        processHeader();
    }

    public void processHeader() {
        if (!this.bufs.hasRemainingBytes(HEADER_LENGTH)) {
            for (int i = 0; i < Math.min(this.bufs.remainingBytes(), ChannelLZ4Compressor.MAGIC.length); i++) {
                if (this.bufs.peekByte(i) != ChannelLZ4Compressor.MAGIC[i]) {
                    close(STREAM_IS_CORRUPTED);
                    return;
                }
            }
            this.input.needMoreData().thenEx(ChannelLZ4Decompressor::checkTruncatedDataException).thenEx((v1, v2) -> {
                return sanitize(v1, v2);
            }).whenResult(r3 -> {
                processHeader();
            });
            return;
        }
        try {
            ByteBuf takeExactSize = this.bufs.takeExactSize(HEADER_LENGTH);
            try {
                readHeader(this.header, takeExactSize.array(), takeExactSize.head());
                if (takeExactSize != null) {
                    takeExactSize.close();
                }
                if (this.header.finished) {
                    this.input.endOfStream().thenEx((v1, v2) -> {
                        return sanitize(v1, v2);
                    }).then(r4 -> {
                        return this.output.accept(null);
                    }).whenResult(r32 -> {
                        completeProcess();
                    });
                } else {
                    processBody();
                }
            } finally {
            }
        } catch (ParseException e) {
            close(e);
        }
    }

    public void processBody() {
        if (!this.bufs.hasRemainingBytes(this.header.compressedLen)) {
            this.input.needMoreData().thenEx(ChannelLZ4Decompressor::checkTruncatedDataException).thenEx((v1, v2) -> {
                return sanitize(v1, v2);
            }).whenResult(r3 -> {
                processBody();
            });
            return;
        }
        ByteBuf takeExactSize = this.bufs.takeExactSize(this.header.compressedLen);
        try {
            try {
                ByteBuf decompress = decompress(this.decompressor, this.checksum, this.header, takeExactSize.array(), takeExactSize.head());
                if (this.inspector != null) {
                    this.inspector.onBlock(this, this.header, takeExactSize, decompress);
                }
                this.output.accept(decompress).whenResult(r32 -> {
                    processHeader();
                });
            } catch (ParseException e) {
                close(e);
                takeExactSize.recycle();
            }
        } finally {
            takeExactSize.recycle();
        }
    }

    @Override // io.datakernel.csp.AbstractCommunicatingProcess
    protected void doClose(Throwable th) {
        this.input.close(th);
        this.output.close(th);
    }

    private static void readHeader(Header header, byte[] bArr, int i) throws ParseException {
        for (int i2 = 0; i2 < ChannelLZ4Compressor.MAGIC_LENGTH; i2++) {
            if (bArr[i + i2] != ChannelLZ4Compressor.MAGIC[i2]) {
                throw STREAM_IS_CORRUPTED;
            }
        }
        int i3 = bArr[i + ChannelLZ4Compressor.MAGIC_LENGTH] & 255;
        header.compressionMethod = i3 & 240;
        int i4 = 10 + (i3 & 15);
        if (header.compressionMethod != 16 && header.compressionMethod != 32) {
            throw STREAM_IS_CORRUPTED;
        }
        header.compressedLen = SafeUtils.readIntLE(bArr, i + ChannelLZ4Compressor.MAGIC_LENGTH + 1);
        header.originalLen = SafeUtils.readIntLE(bArr, i + ChannelLZ4Compressor.MAGIC_LENGTH + 5);
        header.check = SafeUtils.readIntLE(bArr, i + ChannelLZ4Compressor.MAGIC_LENGTH + 9);
        if (header.originalLen > (1 << i4) || header.originalLen < 0 || header.compressedLen < 0 || ((header.originalLen == 0 && header.compressedLen != 0) || ((header.originalLen != 0 && header.compressedLen == 0) || (header.compressionMethod == 16 && header.originalLen != header.compressedLen)))) {
            throw STREAM_IS_CORRUPTED;
        }
        if (header.originalLen == 0) {
            if (header.check != 0) {
                throw STREAM_IS_CORRUPTED;
            }
            header.finished = true;
        }
    }

    private static ByteBuf decompress(LZ4FastDecompressor lZ4FastDecompressor, StreamingXXHash32 streamingXXHash32, Header header, byte[] bArr, int i) throws ParseException {
        ByteBuf allocate = ByteBufPool.allocate(header.originalLen);
        allocate.tail(header.originalLen);
        switch (header.compressionMethod) {
            case 16:
                System.arraycopy(bArr, i, allocate.array(), 0, header.originalLen);
                break;
            case 32:
                try {
                    if (header.compressedLen == lZ4FastDecompressor.decompress(bArr, i, allocate.array(), 0, header.originalLen)) {
                        break;
                    } else {
                        throw STREAM_IS_CORRUPTED;
                    }
                } catch (LZ4Exception e) {
                    throw new ParseException(ChannelLZ4Decompressor.class, "Stream is corrupted", e);
                }
            default:
                throw STREAM_IS_CORRUPTED;
        }
        streamingXXHash32.reset();
        streamingXXHash32.update(allocate.array(), 0, header.originalLen);
        if (streamingXXHash32.getValue() != header.check) {
            throw STREAM_IS_CORRUPTED;
        }
        return allocate;
    }

    private static Promise<Void> checkTruncatedDataException(Void r5, Throwable th) {
        return th == null ? Promise.complete() : th == BinaryChannelSupplier.UNEXPECTED_END_OF_STREAM_EXCEPTION ? Promise.ofException(new TruncatedDataException(ChannelLZ4Decompressor.class, "Unexpected end-of-stream")) : Promise.ofException(th);
    }
}
