package io.fixprotocol.silverflash.fixp.flow;

import io.fixprotocol.silverflash.ExceptionConsumer;
import io.fixprotocol.silverflash.Receiver;
import io.fixprotocol.silverflash.RecoverableSender;
import io.fixprotocol.silverflash.fixp.SessionEventTopics;
import io.fixprotocol.silverflash.fixp.flow.AbstractFlow;
import io.fixprotocol.silverflash.fixp.messages.FinishedSendingEncoder;
import io.fixprotocol.silverflash.fixp.messages.MessageHeaderEncoder;
import io.fixprotocol.silverflash.fixp.messages.RetransmissionEncoder;
import io.fixprotocol.silverflash.fixp.store.MessageStore;
import io.fixprotocol.silverflash.fixp.store.StoreException;
import io.fixprotocol.silverflash.frame.MessageFrameEncoder;
import io.fixprotocol.silverflash.reactor.EventReactor;
import io.fixprotocol.silverflash.reactor.Subscription;
import io.fixprotocol.silverflash.reactor.TimerSchedule;
import io.fixprotocol.silverflash.reactor.Topic;
import io.fixprotocol.silverflash.transport.Transport;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

/* loaded from: input_file:io/fixprotocol/silverflash/fixp/flow/RecoverableFlowSender.class */
public class RecoverableFlowSender extends AbstractFlow implements RecoverableSender, MutableSequence {
    private static final ByteBuffer[] EMPTY = new ByteBuffer[0];
    private final AtomicBoolean criticalSection;
    private final FinishedSendingEncoder finishedSendingEncoder;
    private final Receiver heartbeatEvent;
    private final TimerSchedule heartbeatSchedule;
    private final Subscription heartbeatSubscription;
    private final AtomicBoolean isHeartbeatDue;
    private final AtomicBoolean isRetransmission;
    private final MessageHeaderEncoder messageHeaderEncoder;
    private final ByteBuffer[] one;
    private final RetransmissionEncoder retransmissionEncoder;
    private final ByteBuffer sendBuffer;
    private final ByteBuffer[] srcs;
    private final MessageStore store;
    private final MutableDirectBuffer mutableBuffer;

    /* loaded from: input_file:io/fixprotocol/silverflash/fixp/flow/RecoverableFlowSender$Builder.class */
    public static class Builder<T extends RecoverableFlowSender, B extends FlowBuilder> extends AbstractFlow.Builder {
        private MessageStore store;

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public T build() {
            return (T) new RecoverableFlowSender(this);
        }

        public B withMessageStore(MessageStore messageStore) {
            this.store = messageStore;
            return this;
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public /* bridge */ /* synthetic */ AbstractFlow.Builder withTransport(Transport transport) {
            return super.withTransport(transport);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public /* bridge */ /* synthetic */ AbstractFlow.Builder withSessionId(UUID uuid) {
            return super.withSessionId(uuid);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public /* bridge */ /* synthetic */ AbstractFlow.Builder withSequencer(Sequencer sequencer) {
            return super.withSequencer(sequencer);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public /* bridge */ /* synthetic */ AbstractFlow.Builder withReactor(EventReactor eventReactor) {
            return super.withReactor((EventReactor<ByteBuffer>) eventReactor);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public /* bridge */ /* synthetic */ AbstractFlow.Builder withMessageFrameEncoder(MessageFrameEncoder messageFrameEncoder) {
            return super.withMessageFrameEncoder(messageFrameEncoder);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public /* bridge */ /* synthetic */ AbstractFlow.Builder withKeepaliveInterval(long j) {
            return super.withKeepaliveInterval(j);
        }

        @Override // io.fixprotocol.silverflash.fixp.flow.AbstractFlow.Builder, io.fixprotocol.silverflash.fixp.flow.FlowBuilder
        public /* bridge */ /* synthetic */ AbstractFlow.Builder withExceptionConsumer(ExceptionConsumer exceptionConsumer) {
            return super.withExceptionConsumer(exceptionConsumer);
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    protected RecoverableFlowSender(Builder builder) {
        super(builder);
        this.criticalSection = new AtomicBoolean();
        this.finishedSendingEncoder = new FinishedSendingEncoder();
        this.heartbeatEvent = byteBuffer -> {
            try {
                sendHeartbeat();
            } catch (IOException e) {
                this.reactor.post(SessionEventTopics.getTopic(this.sessionId, SessionEventTopics.FromSessionEventType.SESSION_SUSPENDED), byteBuffer);
            }
        };
        this.isHeartbeatDue = new AtomicBoolean(true);
        this.isRetransmission = new AtomicBoolean();
        this.messageHeaderEncoder = new MessageHeaderEncoder();
        this.one = new ByteBuffer[1];
        this.retransmissionEncoder = new RetransmissionEncoder();
        this.sendBuffer = ByteBuffer.allocateDirect(64).order(ByteOrder.nativeOrder());
        this.srcs = new ByteBuffer[32];
        this.mutableBuffer = new UnsafeBuffer(this.sendBuffer);
        Objects.requireNonNull(builder.store);
        this.store = builder.store;
        Topic topic = SessionEventTopics.getTopic(this.sessionId, SessionEventTopics.SessionEventType.HEARTBEAT);
        this.heartbeatSubscription = this.reactor.subscribe(topic, this.heartbeatEvent);
        this.heartbeatSchedule = this.reactor.postAtInterval(topic, ByteBuffer.allocate(0), this.keepaliveInterval);
    }

    @Override // io.fixprotocol.silverflash.Sequenced
    public long getNextSeqNo() {
        return this.sequencer.getNextSeqNo();
    }

    protected boolean isHeartbeatDue() {
        return this.isHeartbeatDue.getAndSet(true);
    }

    private void persist(long j, ByteBuffer byteBuffer) throws StoreException {
        this.store.insertMessage(this.sessionId, j, byteBuffer);
    }

    @Override // io.fixprotocol.silverflash.RecoverableSender
    public void resend(ByteBuffer byteBuffer, long j, long j2) throws IOException {
        Objects.requireNonNull(byteBuffer);
        while (!this.criticalSection.compareAndSet(false, true)) {
            Thread.yield();
        }
        try {
            this.frameEncoder.wrap(this.sendBuffer, 0).encodeFrameHeader();
            int headerLength = 0 + this.frameEncoder.getHeaderLength();
            this.messageHeaderEncoder.wrap(this.mutableBuffer, headerLength);
            this.messageHeaderEncoder.blockLength(this.retransmissionEncoder.sbeBlockLength()).templateId(this.retransmissionEncoder.sbeTemplateId()).schemaId(this.retransmissionEncoder.sbeSchemaId()).version(this.retransmissionEncoder.sbeSchemaVersion());
            this.retransmissionEncoder.wrap(this.mutableBuffer, headerLength + this.messageHeaderEncoder.encodedLength());
            for (int i = 0; i < 16; i++) {
                this.retransmissionEncoder.sessionId(i, this.uuidAsBytes[i]);
            }
            this.retransmissionEncoder.nextSeqNo(j);
            this.retransmissionEncoder.requestTimestamp(j2);
            this.retransmissionEncoder.count(1L);
            this.frameEncoder.setMessageLength(r0 + this.retransmissionEncoder.encodedLength());
            this.frameEncoder.encodeFrameTrailer();
            this.srcs[0] = this.sendBuffer;
            this.srcs[1] = byteBuffer;
            this.srcs[2] = null;
            this.transport.write(this.srcs);
            this.isHeartbeatDue.set(false);
            this.criticalSection.compareAndSet(true, false);
        } catch (Throwable th) {
            this.criticalSection.compareAndSet(true, false);
            throw th;
        }
    }

    @Override // io.fixprotocol.silverflash.RecoverableSender
    public void resend(ByteBuffer[] byteBufferArr, int i, int i2, long j, long j2) throws IOException {
        Objects.requireNonNull(byteBufferArr);
        while (!this.criticalSection.compareAndSet(false, true)) {
            Thread.yield();
        }
        try {
            this.frameEncoder.wrap(this.sendBuffer, i).encodeFrameHeader();
            int headerLength = i + this.frameEncoder.getHeaderLength();
            this.messageHeaderEncoder.wrap(this.mutableBuffer, headerLength);
            this.messageHeaderEncoder.blockLength(this.retransmissionEncoder.sbeBlockLength()).templateId(this.retransmissionEncoder.sbeTemplateId()).schemaId(this.retransmissionEncoder.sbeSchemaId()).version(this.retransmissionEncoder.sbeSchemaVersion());
            int encodedLength = headerLength + this.messageHeaderEncoder.encodedLength();
            this.retransmissionEncoder.wrap(this.mutableBuffer, encodedLength);
            for (int i3 = 0; i3 < 16; i3++) {
                this.retransmissionEncoder.sessionId(i3, this.uuidAsBytes[i3]);
            }
            this.retransmissionEncoder.nextSeqNo(j);
            this.retransmissionEncoder.requestTimestamp(j2);
            this.retransmissionEncoder.count(i2);
            this.frameEncoder.setMessageLength(encodedLength + this.retransmissionEncoder.encodedLength());
            this.frameEncoder.encodeFrameTrailer();
            this.isRetransmission.set(true);
            this.srcs[0] = this.sendBuffer;
            System.arraycopy(byteBufferArr, encodedLength, this.srcs, 1, i2);
            this.srcs[i2 + 1] = null;
            this.transport.write(this.srcs);
            this.isHeartbeatDue.set(false);
            this.criticalSection.compareAndSet(true, false);
        } catch (Throwable th) {
            this.criticalSection.compareAndSet(true, false);
            throw th;
        }
    }

    @Override // io.fixprotocol.silverflash.Sender
    public long send(ByteBuffer byteBuffer) throws IOException {
        Objects.requireNonNull(byteBuffer);
        this.one[0] = byteBuffer;
        return send(this.one);
    }

    @Override // io.fixprotocol.silverflash.Sender
    public long send(ByteBuffer[] byteBufferArr) throws IOException {
        Objects.requireNonNull(byteBufferArr);
        while (!this.criticalSection.compareAndSet(false, true)) {
            Thread.yield();
        }
        try {
            this.transport.write(this.sequencer.apply(byteBufferArr));
            this.isHeartbeatDue.set(false);
            return this.sequencer.getNextSeqNo();
        } finally {
            this.criticalSection.compareAndSet(true, false);
        }
    }

    public void sendEndOfStream() throws IOException {
        while (!this.criticalSection.compareAndSet(false, true)) {
            Thread.yield();
        }
        try {
            this.frameEncoder.wrap(this.sendBuffer, 0).encodeFrameHeader();
            int headerLength = 0 + this.frameEncoder.getHeaderLength();
            this.messageHeaderEncoder.wrap(this.mutableBuffer, headerLength);
            this.messageHeaderEncoder.blockLength(this.finishedSendingEncoder.sbeBlockLength()).templateId(this.finishedSendingEncoder.sbeTemplateId()).schemaId(this.finishedSendingEncoder.sbeSchemaId()).version(this.finishedSendingEncoder.sbeSchemaVersion());
            this.finishedSendingEncoder.wrap(this.mutableBuffer, headerLength + this.messageHeaderEncoder.encodedLength());
            for (int i = 0; i < 16; i++) {
                this.finishedSendingEncoder.sessionId(i, this.uuidAsBytes[i]);
            }
            this.finishedSendingEncoder.lastSeqNo(this.sequencer.getNextSeqNo() - 1);
            this.frameEncoder.setMessageLength(r0 + this.finishedSendingEncoder.encodedLength());
            this.frameEncoder.encodeFrameTrailer();
            this.transport.write(this.sendBuffer);
        } finally {
            this.criticalSection.compareAndSet(true, false);
        }
    }

    public void sendHeartbeat() throws IOException {
        if (isHeartbeatDue()) {
            send(EMPTY);
        }
    }

    @Override // io.fixprotocol.silverflash.fixp.flow.MutableSequence
    public void setNextSeqNo(long j) {
        ((MutableSequence) this.sequencer).setNextSeqNo(j);
    }
}
