package io.fixprotocol.silverflash.fixp.flow;

import io.fixprotocol.silverflash.ExceptionConsumer;
import io.fixprotocol.silverflash.MessageConsumer;
import io.fixprotocol.silverflash.Receiver;
import io.fixprotocol.silverflash.Sequenced;
import io.fixprotocol.silverflash.Session;
import io.fixprotocol.silverflash.fixp.SessionEventTopics;
import io.fixprotocol.silverflash.fixp.flow.AbstractReceiverFlow;
import io.fixprotocol.silverflash.fixp.messages.ContextDecoder;
import io.fixprotocol.silverflash.fixp.messages.MessageHeaderDecoder;
import io.fixprotocol.silverflash.fixp.messages.MessageHeaderEncoder;
import io.fixprotocol.silverflash.fixp.messages.NotAppliedEncoder;
import io.fixprotocol.silverflash.fixp.messages.SequenceDecoder;
import io.fixprotocol.silverflash.frame.MessageFrameEncoder;
import io.fixprotocol.silverflash.reactor.EventReactor;
import io.fixprotocol.silverflash.reactor.Subscription;
import io.fixprotocol.silverflash.reactor.TimerSchedule;
import io.fixprotocol.silverflash.reactor.Topic;
import io.fixprotocol.silverflash.transport.Transport;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

/* loaded from: input_file:io/fixprotocol/silverflash/fixp/flow/IdempotentFlowReceiver.class */
public class IdempotentFlowReceiver extends AbstractReceiverFlow implements FlowReceiver, Sequenced {
    private final ContextDecoder contextDecoder;
    private final Receiver heartbeatEvent;
    private final TimerSchedule heartbeatSchedule;
    private final Subscription heartbeatSubscription;
    private final DirectBuffer immutableBuffer;
    private boolean isEndOfStream;
    private final AtomicBoolean isHeartbeatDue;
    private final MessageHeaderDecoder messageHeaderDecoder;
    private final MessageHeaderEncoder messageHeaderEncoder;
    private final AtomicLong nextSeqNoAccepted;
    private final AtomicLong nextSeqNoReceived;
    private final NotAppliedEncoder notAppliedEncoder;
    private final ByteBuffer sendBuffer;
    private final SequenceDecoder sequenceDecoder;
    private final Topic terminatedTopic;
    private final Topic toSendTopic;
    private final MutableDirectBuffer mutableBuffer;

    /* loaded from: input_file:io/fixprotocol/silverflash/fixp/flow/IdempotentFlowReceiver$Builder.class */
    public static class Builder<T extends IdempotentFlowReceiver, B extends FlowReceiverBuilder<IdempotentFlowReceiver, B>> extends AbstractReceiverFlow.Builder implements FlowReceiverBuilder {
        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractReceiverFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public IdempotentFlowReceiver build() {
            return new IdempotentFlowReceiver(this);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractReceiverFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public /* bridge */ /* synthetic */ AbstractReceiverFlow.Builder withTransport(Transport transport) {
            return super.withTransport(transport);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractReceiverFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public /* bridge */ /* synthetic */ AbstractReceiverFlow.Builder withSessionId(UUID uuid) {
            return super.withSessionId(uuid);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractReceiverFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowReceiverBuilder
        public /* bridge */ /* synthetic */ AbstractReceiverFlow.Builder withSession(Session session) {
            return super.withSession((Session<UUID>) session);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractReceiverFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public /* bridge */ /* synthetic */ AbstractReceiverFlow.Builder withSequencer(Sequencer sequencer) {
            return super.withSequencer(sequencer);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractReceiverFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public /* bridge */ /* synthetic */ AbstractReceiverFlow.Builder withReactor(EventReactor eventReactor) {
            return super.withReactor((EventReactor<ByteBuffer>) eventReactor);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractReceiverFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public /* bridge */ /* synthetic */ AbstractReceiverFlow.Builder withMessageFrameEncoder(MessageFrameEncoder messageFrameEncoder) {
            return super.withMessageFrameEncoder(messageFrameEncoder);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractReceiverFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowReceiverBuilder
        public /* bridge */ /* synthetic */ AbstractReceiverFlow.Builder withMessageConsumer(MessageConsumer messageConsumer) {
            return super.withMessageConsumer((MessageConsumer<UUID>) messageConsumer);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractReceiverFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public /* bridge */ /* synthetic */ AbstractReceiverFlow.Builder withKeepaliveInterval(long j) {
            return super.withKeepaliveInterval(j);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractReceiverFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public /* bridge */ /* synthetic */ AbstractReceiverFlow.Builder withExceptionConsumer(ExceptionConsumer exceptionConsumer) {
            return super.withExceptionConsumer(exceptionConsumer);
        }
    }

    public static Builder<IdempotentFlowReceiver, ? extends FlowReceiverBuilder> builder() {
        return new Builder<>();
    }

    protected IdempotentFlowReceiver(Builder builder) {
        super(builder);
        this.contextDecoder = new ContextDecoder();
        this.heartbeatEvent = byteBuffer -> {
            if (isHeartbeatDue()) {
            }
        };
        this.immutableBuffer = new UnsafeBuffer(new byte[0]);
        this.isEndOfStream = false;
        this.isHeartbeatDue = new AtomicBoolean(true);
        this.messageHeaderDecoder = new MessageHeaderDecoder();
        this.messageHeaderEncoder = new MessageHeaderEncoder();
        this.nextSeqNoAccepted = new AtomicLong(1L);
        this.nextSeqNoReceived = new AtomicLong(1L);
        this.notAppliedEncoder = new NotAppliedEncoder();
        this.sendBuffer = ByteBuffer.allocateDirect(32).order(ByteOrder.nativeOrder());
        this.sequenceDecoder = new SequenceDecoder();
        this.mutableBuffer = new UnsafeBuffer(this.sendBuffer);
        Objects.requireNonNull(this.messageConsumer);
        this.frameEncoder.wrap(this.sendBuffer, 0).encodeFrameHeader();
        int headerLength = 0 + this.frameEncoder.getHeaderLength();
        this.messageHeaderEncoder.wrap(this.mutableBuffer, headerLength);
        this.messageHeaderEncoder.blockLength(this.notAppliedEncoder.sbeBlockLength()).templateId(this.notAppliedEncoder.sbeTemplateId()).schemaId(this.notAppliedEncoder.sbeSchemaId()).version(this.notAppliedEncoder.sbeSchemaVersion());
        this.notAppliedEncoder.wrap(this.mutableBuffer, headerLength + this.messageHeaderEncoder.encodedLength());
        this.notAppliedEncoder.fromSeqNo(0L);
        this.notAppliedEncoder.count(0L);
        this.frameEncoder.setMessageLength(r0 + this.notAppliedEncoder.encodedLength());
        this.frameEncoder.encodeFrameTrailer();
        this.toSendTopic = SessionEventTopics.getTopic(this.sessionId, SessionEventTopics.ToSessionEventType.APPLICATION_MESSAGE_TO_SEND);
        this.terminatedTopic = SessionEventTopics.getTopic(this.sessionId, SessionEventTopics.SessionEventType.PEER_TERMINATED);
        if (this.keepaliveInterval == 0) {
            this.heartbeatSubscription = null;
            this.heartbeatSchedule = null;
        } else {
            Topic topic = SessionEventTopics.getTopic(this.sessionId, SessionEventTopics.SessionEventType.PEER_HEARTBEAT);
            this.heartbeatSubscription = this.reactor.subscribe(topic, this.heartbeatEvent);
            this.heartbeatSchedule = this.reactor.postAtInterval(topic, null, this.keepaliveInterval);
        }
    }

    @Override // java.util.function.Consumer
    public void accept(ByteBuffer byteBuffer) {
        this.immutableBuffer.wrap(byteBuffer);
        int position = byteBuffer.position();
        this.messageHeaderDecoder.wrap(this.immutableBuffer, position);
        int encodedLength = position + this.messageHeaderDecoder.encodedLength();
        boolean z = true;
        if (this.messageHeaderDecoder.schemaId() == this.sequenceDecoder.sbeSchemaId()) {
            switch (this.messageHeaderDecoder.templateId()) {
                case 8:
                    this.sequenceDecoder.wrap(this.immutableBuffer, encodedLength, this.sequenceDecoder.sbeBlockLength(), this.sequenceDecoder.sbeSchemaVersion());
                    onSequence(this.sequenceDecoder);
                    z = false;
                    break;
                case 9:
                    this.contextDecoder.wrap(this.immutableBuffer, encodedLength, this.contextDecoder.sbeBlockLength(), this.contextDecoder.sbeSchemaVersion());
                    onContext(this.contextDecoder);
                    z = false;
                    break;
                case 14:
                    terminated(byteBuffer);
                    z = false;
                    break;
                case 18:
                    break;
                default:
                    this.reactor.post(this.terminatedTopic, byteBuffer);
                    break;
            }
        }
        if (!z || this.isEndOfStream) {
            return;
        }
        long andIncrement = this.nextSeqNoReceived.getAndIncrement();
        if (this.nextSeqNoAccepted.compareAndSet(andIncrement, andIncrement)) {
            this.nextSeqNoAccepted.incrementAndGet();
            this.messageConsumer.accept(byteBuffer, this.session, andIncrement);
        }
    }

    @Override // io.fixprotocol.silverflash.Sequenced
    public long getNextSeqNo() {
        return this.nextSeqNoReceived.get();
    }

    private void handleSequence(long j) {
        this.isHeartbeatDue.set(false);
        long andSet = this.nextSeqNoReceived.getAndSet(j);
        long j2 = this.nextSeqNoAccepted.get();
        if (j > j2) {
            if (j > j2) {
                notifyGap(andSet, (int) (j - andSet));
            }
            this.nextSeqNoAccepted.set(j);
        }
    }

    @Override // io.fixprotocol.silverflash.fixp.flow.FlowReceiver
    public boolean isHeartbeatDue() {
        return this.isHeartbeatDue.getAndSet(true);
    }

    void notifyGap(long j, int i) {
        this.notAppliedEncoder.fromSeqNo(j);
        this.notAppliedEncoder.count(i);
        this.reactor.post(this.toSendTopic, this.sendBuffer.duplicate());
    }

    void onContext(ContextDecoder contextDecoder) {
        handleSequence(contextDecoder.nextSeqNo());
    }

    void onSequence(SequenceDecoder sequenceDecoder) {
        handleSequence(sequenceDecoder.nextSeqNo());
    }

    private void terminated(ByteBuffer byteBuffer) {
        this.isEndOfStream = true;
        this.reactor.post(this.terminatedTopic, byteBuffer);
        if (this.heartbeatSchedule != null) {
            this.heartbeatSchedule.cancel();
            this.heartbeatSubscription.unsubscribe();
        }
    }
}
