package io.fixprotocol.silverflash.reactor.bridge;

import io.fixprotocol.silverflash.Receiver;
import io.fixprotocol.silverflash.buffer.BufferSupplier;
import io.fixprotocol.silverflash.buffer.SingleBufferSupplier;
import io.fixprotocol.silverflash.fixp.messages.EventDecoder;
import io.fixprotocol.silverflash.fixp.messages.EventEncoder;
import io.fixprotocol.silverflash.fixp.messages.MessageHeaderDecoder;
import io.fixprotocol.silverflash.fixp.messages.MessageHeaderEncoder;
import io.fixprotocol.silverflash.frame.FrameSpliterator;
import io.fixprotocol.silverflash.frame.MessageFrameEncoder;
import io.fixprotocol.silverflash.frame.MessageLengthFrameEncoder;
import io.fixprotocol.silverflash.frame.MessageLengthFrameSpliterator;
import io.fixprotocol.silverflash.reactor.Dispatcher;
import io.fixprotocol.silverflash.reactor.EventReactor;
import io.fixprotocol.silverflash.reactor.Subscription;
import io.fixprotocol.silverflash.reactor.Topic;
import io.fixprotocol.silverflash.reactor.Topics;
import io.fixprotocol.silverflash.transport.Transport;
import io.fixprotocol.silverflash.transport.TransportConsumer;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

/* loaded from: input_file:io/fixprotocol/silverflash/reactor/bridge/EventReactorWithBridge.class */
public class EventReactorWithBridge extends EventReactor<ByteBuffer> {
    private static Receiver theForwarder = byteBuffer -> {
    };
    private final BufferSupplier buffers;
    private final FrameSpliterator frameSpliter;
    private final DirectBuffer immutableBuffer;
    private final MessageHeaderDecoder messageHeaderDecoder;
    private final Consumer<? super ByteBuffer> inboundReceiver;
    private final Map<Topic, Subscription> subscriptions;
    private final Transport transport;
    private final TransportConsumer transportConsumer;

    /* loaded from: input_file:io/fixprotocol/silverflash/reactor/bridge/EventReactorWithBridge$Builder.class */
    public static class Builder extends EventReactor.Builder<ByteBuffer, EventReactor<ByteBuffer>, Builder> {
        private Transport transport;

        @Override // io.fixprotocol.silverflash.reactor.EventReactor.Builder
        /* renamed from: build, reason: merged with bridge method [inline-methods] */
        public EventReactor<ByteBuffer> build2() {
            return new EventReactorWithBridge(this);
        }

        public Builder withTransport(Transport transport) {
            this.transport = transport;
            withDispatcher(new ForwardDispatcher(transport, EventReactorWithBridge.theForwarder));
            return this;
        }
    }

    /* loaded from: input_file:io/fixprotocol/silverflash/reactor/bridge/EventReactorWithBridge$ForwardDispatcher.class */
    private static class ForwardDispatcher implements Dispatcher<ByteBuffer> {
        private final Receiver forwarder;
        private final Transport transport;
        private final ByteBuffer sendBuffer = ByteBuffer.allocateDirect(2048).order(ByteOrder.nativeOrder());
        private final MessageFrameEncoder frameEncoder = new MessageLengthFrameEncoder();
        private final MessageHeaderEncoder messageHeaderEncoder = new MessageHeaderEncoder();
        private final EventEncoder eventEncoder = new EventEncoder();
        private final MutableDirectBuffer mutableBuffer = new UnsafeBuffer(this.sendBuffer);
        private final DirectBuffer immutableBuffer = new UnsafeBuffer(new byte[0]);

        public ForwardDispatcher(Transport transport, Receiver receiver) {
            this.transport = transport;
            this.forwarder = receiver;
        }

        @Override // io.fixprotocol.silverflash.reactor.Dispatcher
        public void dispatch(Topic topic, ByteBuffer byteBuffer, Receiver receiver) throws IOException {
            if (receiver == this.forwarder) {
                forward(topic, byteBuffer);
            } else {
                byteBuffer.flip();
                receiver.accept(byteBuffer);
            }
        }

        private void forward(Topic topic, ByteBuffer byteBuffer) throws IOException {
            this.sendBuffer.clear();
            this.frameEncoder.wrap(this.sendBuffer, 0).encodeFrameHeader();
            int headerLength = 0 + this.frameEncoder.getHeaderLength();
            this.messageHeaderEncoder.wrap(this.mutableBuffer, headerLength);
            this.messageHeaderEncoder.blockLength(this.eventEncoder.sbeBlockLength()).templateId(this.eventEncoder.sbeTemplateId()).schemaId(this.eventEncoder.sbeSchemaId()).version(this.eventEncoder.sbeSchemaVersion());
            this.eventEncoder.wrap(this.mutableBuffer, headerLength + this.messageHeaderEncoder.encodedLength());
            byte[] bytes = Topics.toString(topic).getBytes();
            this.eventEncoder.putTopic(bytes, 0, bytes.length);
            byteBuffer.flip();
            this.immutableBuffer.wrap(byteBuffer);
            this.eventEncoder.putPayload(this.immutableBuffer, byteBuffer.position(), byteBuffer.remaining());
            this.frameEncoder.setMessageLength(r0 + this.eventEncoder.encodedLength());
            this.frameEncoder.encodeFrameTrailer();
            this.transport.write(this.sendBuffer);
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    protected EventReactorWithBridge(Builder builder) {
        super(builder);
        this.buffers = new SingleBufferSupplier(ByteBuffer.allocateDirect(16384).order(ByteOrder.nativeOrder()));
        this.frameSpliter = new MessageLengthFrameSpliterator();
        this.immutableBuffer = new UnsafeBuffer(new byte[0]);
        this.messageHeaderDecoder = new MessageHeaderDecoder();
        this.inboundReceiver = new Consumer<ByteBuffer>() { // from class: io.fixprotocol.silverflash.reactor.bridge.EventReactorWithBridge.1
            private final EventDecoder eventDecoder = new EventDecoder();
            private final ByteBuffer payload = ByteBuffer.allocateDirect(16384).order(ByteOrder.nativeOrder());
            private final MutableDirectBuffer payloadBuffer = new UnsafeBuffer(this.payload);

            @Override // java.util.function.Consumer
            public void accept(ByteBuffer byteBuffer) {
                EventReactorWithBridge.this.immutableBuffer.wrap(byteBuffer);
                int position = byteBuffer.position();
                EventReactorWithBridge.this.messageHeaderDecoder.wrap(EventReactorWithBridge.this.immutableBuffer, position);
                this.eventDecoder.wrap(EventReactorWithBridge.this.immutableBuffer, position + EventReactorWithBridge.this.messageHeaderDecoder.encodedLength(), this.eventDecoder.sbeBlockLength(), this.eventDecoder.sbeSchemaVersion());
                Topic parse = Topics.parse(this.eventDecoder.topic());
                this.eventDecoder.getPayload(this.payloadBuffer, 0, this.payload.capacity());
                EventReactorWithBridge.this.post(parse, this.payload);
            }
        };
        this.subscriptions = new HashMap();
        this.transportConsumer = new TransportConsumer() { // from class: io.fixprotocol.silverflash.reactor.bridge.EventReactorWithBridge.2
            @Override // java.util.function.Consumer
            public void accept(ByteBuffer byteBuffer) {
                EventReactorWithBridge.this.frameSpliter.wrap(byteBuffer);
                EventReactorWithBridge.this.frameSpliter.forEachRemaining(EventReactorWithBridge.this.inboundReceiver);
            }

            @Override // io.fixprotocol.silverflash.transport.TransportConsumer
            public void connected() {
            }

            @Override // io.fixprotocol.silverflash.transport.TransportConsumer
            public void disconnected() {
            }
        };
        this.transport = builder.transport;
    }

    public void accept(ByteBuffer byteBuffer) {
    }

    @Override // io.fixprotocol.silverflash.reactor.EventReactor, java.lang.AutoCloseable
    public void close() {
        super.close();
        this.transport.close();
    }

    public void forward(Topic topic) {
        Subscription subscribe = subscribe(topic, theForwarder);
        if (subscribe != null) {
            this.subscriptions.put(topic, subscribe);
        }
    }

    @Override // io.fixprotocol.silverflash.reactor.EventReactor, io.fixprotocol.silverflash.Service
    public CompletableFuture<? extends EventReactor<ByteBuffer>> open() {
        CompletableFuture open = super.open();
        CompletableFuture<? extends Transport> open2 = this.transport.open(this.buffers, this.transportConsumer);
        Throwable th = null;
        try {
            open.get();
        } catch (InterruptedException e) {
            th = e;
        } catch (ExecutionException e2) {
            th = e2.getCause();
        }
        Throwable th2 = null;
        try {
            open2.get();
        } catch (InterruptedException e3) {
            th2 = e3;
        } catch (ExecutionException e4) {
            th2 = e4.getCause();
        }
        CompletableFuture<? extends EventReactor<ByteBuffer>> completableFuture = new CompletableFuture<>();
        if (open.isCompletedExceptionally()) {
            completableFuture.completeExceptionally(th);
            close();
        } else if (open2.isCompletedExceptionally()) {
            completableFuture.completeExceptionally(th2);
            close();
        } else {
            completableFuture.complete(this);
        }
        return completableFuture;
    }

    public void stopForwarding(Topic topic) {
        this.subscriptions.remove(topic);
    }
}
