package jp.ad.sinet.stream.api;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import jp.ad.sinet.stream.api.compression.CompressionFactory;
import jp.ad.sinet.stream.crypto.CryptoDeserializerWrapper;
import jp.ad.sinet.stream.marshal.Unmarshaller;
import jp.ad.sinet.stream.packet.Packet;
import jp.ad.sinet.stream.spi.PluginMessageIO;
import jp.ad.sinet.stream.spi.PluginMessageWrapper;
import jp.ad.sinet.stream.spi.ReaderParameters;
import jp.ad.sinet.stream.utils.Pair;
import jp.ad.sinet.stream.utils.Timestamped;
import lombok.Generated;

/* loaded from: input_file:jp/ad/sinet/stream/api/SinetStreamBaseReader.class */
public class SinetStreamBaseReader<T, U extends PluginMessageIO> extends SinetStreamIO<U> {
    private final List<String> topics;
    private final Duration receiveTimeout;
    private final Deserializer<T> deserializer;
    private final Decompressor decompressor;
    private final Deserializer<Timestamped<T>> compositeDeserializer;
    private static ThreadLocal<CompressionMetrics> compressionMetrics = new ThreadLocal<CompressionMetrics>() { // from class: jp.ad.sinet.stream.api.SinetStreamBaseReader.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public CompressionMetrics initialValue() {
            return new CompressionMetrics();
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:jp/ad/sinet/stream/api/SinetStreamBaseReader$CompressionMetrics.class */
    public static class CompressionMetrics {
        public int compLen;
        public int uncompLen;

        private CompressionMetrics() {
        }

        void set(int i, int i2) {
            this.compLen = i;
            this.uncompLen = i2;
        }
    }

    /* loaded from: input_file:jp/ad/sinet/stream/api/SinetStreamBaseReader$PktDeserializer.class */
    public static class PktDeserializer implements Deserializer<Pair<byte[], Integer>> {
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // jp.ad.sinet.stream.api.Deserializer
        public Pair<byte[], Integer> deserialize(byte[] bArr) {
            Packet decode = Packet.decode(bArr);
            if (decode == null) {
                return Pair.of(bArr, null);
            }
            return Pair.of(decode.getMessage(), Integer.valueOf(decode.getHeader().getKeyVersion()));
        }
    }

    /* loaded from: input_file:jp/ad/sinet/stream/api/SinetStreamBaseReader$ThruDeserializer.class */
    public static class ThruDeserializer implements Deserializer<Pair<byte[], Integer>> {
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // jp.ad.sinet.stream.api.Deserializer
        public Pair<byte[], Integer> deserialize(byte[] bArr) {
            return Pair.of(bArr, null);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:jp/ad/sinet/stream/api/SinetStreamBaseReader$UserDataOnlyDeserializer.class */
    public class UserDataOnlyDeserializer<T> implements Deserializer<Timestamped<T>> {
        private Deserializer<T> deserializer;

        public UserDataOnlyDeserializer(Deserializer<T> deserializer) {
            this.deserializer = deserializer;
        }

        @Override // jp.ad.sinet.stream.api.Deserializer
        public Timestamped<T> deserialize(byte[] bArr) {
            return new Timestamped<>(this.deserializer.deserialize(bArr), 0L);
        }
    }

    public SinetStreamBaseReader(U u, ReaderParameters readerParameters, Deserializer<T> deserializer) {
        super(readerParameters, u);
        this.topics = Collections.unmodifiableList(readerParameters.getTopics());
        this.receiveTimeout = readerParameters.getReceiveTimeout();
        this.deserializer = setupDeserializer(readerParameters, deserializer);
        this.decompressor = setupDecompressor(readerParameters);
        this.compositeDeserializer = generateDeserializer(readerParameters);
    }

    private Deserializer<T> setupDeserializer(ReaderParameters readerParameters, Deserializer<T> deserializer) {
        return Objects.isNull(deserializer) ? readerParameters.getValueType().getDeserializer() : deserializer;
    }

    private Decompressor setupDecompressor(ReaderParameters readerParameters) {
        if (!readerParameters.isDataCompression()) {
            return bArr -> {
                return bArr;
            };
        }
        try {
            return CompressionFactory.createDecompressor((Map) readerParameters.getConfig().get("compression"));
        } catch (ClassCastException e) {
            throw new InvalidConfigurationException("the parameter compression must be map", e);
        }
    }

    private Deserializer<Timestamped<T>> generateDeserializer(ReaderParameters readerParameters) {
        Deserializer pktDeserializer;
        if (isUserDataOnly()) {
            return new UserDataOnlyDeserializer(this.deserializer);
        }
        int messageFormat = getMessageFormat();
        switch (messageFormat) {
            case 2:
                pktDeserializer = new ThruDeserializer();
                break;
            case 3:
                pktDeserializer = new PktDeserializer();
                break;
            default:
                throw new InvalidConfigurationException("message_format=" + messageFormat + " is not supported");
        }
        Unmarshaller unmarshaller = new Unmarshaller();
        return CryptoDeserializerWrapper.getDeserializer(readerParameters.getConfig(), pktDeserializer, bArr -> {
            Timestamped<byte[]> decode = unmarshaller.decode(bArr);
            byte[] decompress = this.decompressor.decompress(decode.getValue());
            compressionMetrics.get().set(decode.getValue().length, decompress.length);
            return new Timestamped(this.deserializer.deserialize(decompress), decode.getTstamp());
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Message<T> toMessage(PluginMessageWrapper pluginMessageWrapper) {
        if (Objects.isNull(pluginMessageWrapper)) {
            return null;
        }
        Timestamped<byte[]> value = pluginMessageWrapper.getValue();
        byte[] value2 = value.getValue();
        Timestamped<T> deserialize = getCompositeDeserializer().deserialize(value2);
        CompressionMetrics compressionMetrics2 = compressionMetrics.get();
        updateMetrics(value2.length, compressionMetrics2.compLen, compressionMetrics2.uncompLen);
        long tstamp = deserialize.getTstamp();
        if (tstamp == 0) {
            tstamp = value.getTstamp();
        }
        return new Message<>(deserialize.getValue(), pluginMessageWrapper.getTopic(), Long.valueOf(tstamp), pluginMessageWrapper.getRaw());
    }

    public String getTopic() {
        return String.join(",", this.topics);
    }

    public void debugDisconnectForcibly() throws Exception {
        System.err.println("XXX: SinetStreamBaseReader: target=" + this.target);
        this.target.debugDisconnectForcibly();
    }

    @Generated
    public List<String> getTopics() {
        return this.topics;
    }

    @Generated
    public Duration getReceiveTimeout() {
        return this.receiveTimeout;
    }

    @Generated
    public Deserializer<T> getDeserializer() {
        return this.deserializer;
    }

    @Generated
    public Decompressor getDecompressor() {
        return this.decompressor;
    }

    @Generated
    public Deserializer<Timestamped<T>> getCompositeDeserializer() {
        return this.compositeDeserializer;
    }
}
