package io.fixprotocol.silverflash.fixp.flow;

import io.fixprotocol.silverflash.ExceptionConsumer;
import io.fixprotocol.silverflash.MessageConsumer;
import io.fixprotocol.silverflash.Receiver;
import io.fixprotocol.silverflash.Sequenced;
import io.fixprotocol.silverflash.Session;
import io.fixprotocol.silverflash.fixp.SessionEventTopics;
import io.fixprotocol.silverflash.fixp.flow.AbstractReceiverFlow;
import io.fixprotocol.silverflash.fixp.messages.MessageHeaderDecoder;
import io.fixprotocol.silverflash.fixp.messages.MessageHeaderEncoder;
import io.fixprotocol.silverflash.fixp.messages.RetransmissionDecoder;
import io.fixprotocol.silverflash.fixp.messages.RetransmitRequestEncoder;
import io.fixprotocol.silverflash.fixp.messages.SequenceDecoder;
import io.fixprotocol.silverflash.frame.MessageFrameEncoder;
import io.fixprotocol.silverflash.reactor.EventReactor;
import io.fixprotocol.silverflash.reactor.Subscription;
import io.fixprotocol.silverflash.reactor.TimerSchedule;
import io.fixprotocol.silverflash.reactor.Topic;
import io.fixprotocol.silverflash.transport.Transport;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

/* loaded from: input_file:io/fixprotocol/silverflash/fixp/flow/RecoverableFlowReceiver.class */
public class RecoverableFlowReceiver extends AbstractReceiverFlow implements Sequenced, FlowReceiver {
    private final Topic finishedTopic;
    private final Receiver heartbeatEvent;
    private final TimerSchedule heartbeatSchedule;
    private final Subscription heartbeatSubscription;
    private boolean isEndOfStream;
    private final AtomicBoolean isHeartbeatDue;
    private final AtomicBoolean isRetransmission;
    private long lastRequestTimestamp;
    private long lastRetransSeqNoToAccept;
    private final AtomicLong nextRetransSeqNoReceived;
    private final AtomicLong nextSeqNoAccepted;
    private final AtomicLong nextSeqNoReceived;
    private final ByteBuffer sendBuffer;
    private final RetransmitRequestEncoder retransmitRequestEncoder;
    private final byte[] retransSessionId;
    private final Topic retrieveTopic;
    private final Topic terminatedTopic;
    private final MutableDirectBuffer mutableBuffer;
    private final MessageHeaderEncoder messageHeaderEncoder;
    private final DirectBuffer immutableBuffer;
    private final MessageHeaderDecoder messageHeaderDecoder;
    private final SequenceDecoder sequenceDecoder;
    private final RetransmissionDecoder retransmissionDecoder;

    /* loaded from: input_file:io/fixprotocol/silverflash/fixp/flow/RecoverableFlowReceiver$Builder.class */
    public static class Builder<T extends RecoverableFlowReceiver, B extends FlowReceiverBuilder<RecoverableFlowReceiver, B>> extends AbstractReceiverFlow.Builder implements FlowReceiverBuilder {
        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractReceiverFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public RecoverableFlowReceiver build() {
            return new RecoverableFlowReceiver(this);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractReceiverFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public /* bridge */ /* synthetic */ AbstractReceiverFlow.Builder withTransport(Transport transport) {
            return super.withTransport(transport);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractReceiverFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public /* bridge */ /* synthetic */ AbstractReceiverFlow.Builder withSessionId(UUID uuid) {
            return super.withSessionId(uuid);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractReceiverFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowReceiverBuilder
        public /* bridge */ /* synthetic */ AbstractReceiverFlow.Builder withSession(Session session) {
            return super.withSession((Session<UUID>) session);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractReceiverFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public /* bridge */ /* synthetic */ AbstractReceiverFlow.Builder withSequencer(Sequencer sequencer) {
            return super.withSequencer(sequencer);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractReceiverFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public /* bridge */ /* synthetic */ AbstractReceiverFlow.Builder withReactor(EventReactor eventReactor) {
            return super.withReactor((EventReactor<ByteBuffer>) eventReactor);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractReceiverFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public /* bridge */ /* synthetic */ AbstractReceiverFlow.Builder withMessageFrameEncoder(MessageFrameEncoder messageFrameEncoder) {
            return super.withMessageFrameEncoder(messageFrameEncoder);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractReceiverFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowReceiverBuilder
        public /* bridge */ /* synthetic */ AbstractReceiverFlow.Builder withMessageConsumer(MessageConsumer messageConsumer) {
            return super.withMessageConsumer((MessageConsumer<UUID>) messageConsumer);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractReceiverFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public /* bridge */ /* synthetic */ AbstractReceiverFlow.Builder withKeepaliveInterval(long j) {
            return super.withKeepaliveInterval(j);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractReceiverFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public /* bridge */ /* synthetic */ AbstractReceiverFlow.Builder withExceptionConsumer(ExceptionConsumer exceptionConsumer) {
            return super.withExceptionConsumer(exceptionConsumer);
        }
    }

    public static Builder<RecoverableFlowReceiver, ? extends FlowReceiverBuilder> builder() {
        return new Builder<>();
    }

    protected RecoverableFlowReceiver(Builder builder) {
        super(builder);
        this.heartbeatEvent = byteBuffer -> {
            if (isHeartbeatDue()) {
                terminated(null);
            }
        };
        this.isEndOfStream = false;
        this.isHeartbeatDue = new AtomicBoolean(true);
        this.isRetransmission = new AtomicBoolean();
        this.lastRequestTimestamp = 0L;
        this.lastRetransSeqNoToAccept = 0L;
        this.nextRetransSeqNoReceived = new AtomicLong(1L);
        this.nextSeqNoAccepted = new AtomicLong(1L);
        this.nextSeqNoReceived = new AtomicLong(1L);
        this.sendBuffer = ByteBuffer.allocateDirect(64).order(ByteOrder.nativeOrder());
        this.retransmitRequestEncoder = new RetransmitRequestEncoder();
        this.retransSessionId = new byte[16];
        this.mutableBuffer = new UnsafeBuffer(this.sendBuffer);
        this.messageHeaderEncoder = new MessageHeaderEncoder();
        this.immutableBuffer = new UnsafeBuffer(new byte[0]);
        this.messageHeaderDecoder = new MessageHeaderDecoder();
        this.sequenceDecoder = new SequenceDecoder();
        this.retransmissionDecoder = new RetransmissionDecoder();
        Objects.requireNonNull(this.messageConsumer);
        this.frameEncoder.wrap(this.sendBuffer, 0).encodeFrameHeader();
        int headerLength = 0 + this.frameEncoder.getHeaderLength();
        this.messageHeaderEncoder.wrap(this.mutableBuffer, headerLength);
        this.messageHeaderEncoder.blockLength(this.retransmitRequestEncoder.sbeBlockLength()).templateId(this.retransmitRequestEncoder.sbeTemplateId()).schemaId(this.retransmitRequestEncoder.sbeSchemaId()).version(this.retransmitRequestEncoder.sbeSchemaVersion());
        this.retransmitRequestEncoder.wrap(this.mutableBuffer, headerLength + this.messageHeaderEncoder.encodedLength());
        this.frameEncoder.setMessageLength(r0 + this.retransmitRequestEncoder.encodedLength());
        this.frameEncoder.encodeFrameTrailer();
        this.retrieveTopic = SessionEventTopics.getTopic(SessionEventTopics.ServiceEventType.SERVICE_STORE_RETREIVE);
        this.terminatedTopic = SessionEventTopics.getTopic(this.sessionId, SessionEventTopics.SessionEventType.PEER_TERMINATED);
        this.finishedTopic = SessionEventTopics.getTopic(this.sessionId, SessionEventTopics.FromSessionEventType.SESSION_FINISHED);
        Topic topic = SessionEventTopics.getTopic(this.sessionId, SessionEventTopics.SessionEventType.PEER_HEARTBEAT);
        this.heartbeatSubscription = this.reactor.subscribe(topic, this.heartbeatEvent);
        this.heartbeatSchedule = this.reactor.postAtInterval(topic, null, this.keepaliveInterval);
    }

    @Override // java.util.function.Consumer
    public void accept(ByteBuffer byteBuffer) {
        this.immutableBuffer.wrap(byteBuffer);
        int position = byteBuffer.position();
        this.messageHeaderDecoder.wrap(this.immutableBuffer, position);
        boolean z = true;
        int encodedLength = position + this.messageHeaderDecoder.encodedLength();
        if (this.messageHeaderDecoder.schemaId() == this.sequenceDecoder.sbeSchemaId()) {
            switch (this.messageHeaderDecoder.templateId()) {
                case 8:
                    this.sequenceDecoder.wrap(this.immutableBuffer, encodedLength, this.sequenceDecoder.sbeBlockLength(), this.sequenceDecoder.sbeSchemaVersion());
                    onSequence(this.sequenceDecoder);
                    z = false;
                    break;
                case 9:
                case 10:
                case 11:
                case 13:
                default:
                    this.reactor.post(this.terminatedTopic, byteBuffer);
                    break;
                case 12:
                    this.retransmissionDecoder.wrap(this.immutableBuffer, encodedLength, this.retransmissionDecoder.sbeBlockLength(), this.sequenceDecoder.sbeSchemaVersion());
                    onRetransmission(this.retransmissionDecoder, byteBuffer);
                    z = false;
                    break;
                case 14:
                    terminated(byteBuffer);
                    z = false;
                    break;
                case 15:
                    finished(byteBuffer);
                    z = false;
                    break;
            }
        }
        if (!z || this.isEndOfStream) {
            return;
        }
        if (this.isRetransmission.get()) {
            long andIncrement = this.nextRetransSeqNoReceived.getAndIncrement();
            if (andIncrement <= this.lastRetransSeqNoToAccept) {
                this.messageConsumer.accept(byteBuffer, this.session, andIncrement);
                return;
            }
            return;
        }
        long andIncrement2 = this.nextSeqNoReceived.getAndIncrement();
        if (this.nextSeqNoAccepted.compareAndSet(andIncrement2, andIncrement2)) {
            this.nextSeqNoAccepted.incrementAndGet();
            this.messageConsumer.accept(byteBuffer, this.session, andIncrement2);
        }
    }

    void onRetransmission(RetransmissionDecoder retransmissionDecoder, ByteBuffer byteBuffer) {
        for (int i = 0; i < 16; i++) {
            this.retransSessionId[i] = (byte) retransmissionDecoder.sessionId(i);
        }
        long nextSeqNo = retransmissionDecoder.nextSeqNo();
        long requestTimestamp = retransmissionDecoder.requestTimestamp();
        long count = retransmissionDecoder.count();
        if (requestTimestamp != this.lastRequestTimestamp || !Arrays.equals(this.uuidAsBytes, this.retransSessionId)) {
            this.reactor.post(this.terminatedTopic, byteBuffer);
            return;
        }
        this.nextRetransSeqNoReceived.set(nextSeqNo);
        this.lastRetransSeqNoToAccept = nextSeqNo + count;
        this.isRetransmission.set(true);
    }

    void onSequence(SequenceDecoder sequenceDecoder) {
        this.isHeartbeatDue.set(false);
        long nextSeqNo = sequenceDecoder.nextSeqNo();
        long andSet = this.nextSeqNoReceived.getAndSet(nextSeqNo);
        long j = this.nextSeqNoAccepted.get();
        this.isRetransmission.set(false);
        if (nextSeqNo > j) {
            if (nextSeqNo > j) {
                notifyGap(andSet, (int) (nextSeqNo - andSet));
            }
            this.nextSeqNoAccepted.set(nextSeqNo);
        }
    }

    private void finished(ByteBuffer byteBuffer) {
        this.isEndOfStream = true;
        byteBuffer.rewind();
        this.reactor.post(this.finishedTopic, byteBuffer);
    }

    @Override // io.fixprotocol.silverflash.Sequenced
    public long getNextSeqNo() {
        return this.nextSeqNoReceived.get();
    }

    @Override // io.fixprotocol.silverflash.fixp.flow.FlowReceiver
    public boolean isHeartbeatDue() {
        return this.isHeartbeatDue.getAndSet(true);
    }

    void notifyGap(long j, int i) {
        for (int i2 = 0; i2 < 16; i2++) {
            this.retransmitRequestEncoder.sessionId(i2, this.uuidAsBytes[i2]);
        }
        this.lastRequestTimestamp = System.nanoTime();
        this.retransmitRequestEncoder.timestamp(this.lastRequestTimestamp);
        this.retransmitRequestEncoder.fromSeqNo(j);
        this.retransmitRequestEncoder.count(i);
        this.reactor.post(this.retrieveTopic, this.sendBuffer);
    }

    private void terminated(ByteBuffer byteBuffer) {
        this.isEndOfStream = true;
        byteBuffer.rewind();
        this.reactor.post(this.terminatedTopic, byteBuffer);
        this.heartbeatSchedule.cancel();
        this.heartbeatSubscription.unsubscribe();
    }
}
