package reactor.aeron.server;

import io.aeron.Image;
import io.aeron.logbuffer.FragmentHandler;
import io.aeron.logbuffer.Header;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.agrona.DirectBuffer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.aeron.AeronOptions;
import reactor.aeron.AeronResources;
import reactor.aeron.Connection;
import reactor.aeron.DefaultAeronConnection;
import reactor.aeron.DefaultAeronInbound;
import reactor.aeron.DefaultAeronOutbound;
import reactor.aeron.MessageSubscription;
import reactor.aeron.OnDisposable;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: input_file:reactor/aeron/server/AeronServerHandler.class */
final class AeronServerHandler implements FragmentHandler, OnDisposable {
    private static final Logger logger = LoggerFactory.getLogger(AeronServerHandler.class);
    private final AeronOptions options;
    private final AeronResources resources;
    private final Function<? super Connection, ? extends Publisher<Void>> handler;
    private volatile MessageSubscription subscription;
    private final Map<Integer, Connection> connections = new ConcurrentHashMap(32);
    private final MonoProcessor<Void> dispose = MonoProcessor.create();
    private final MonoProcessor<Void> onDispose = MonoProcessor.create();

    /* JADX INFO: Access modifiers changed from: package-private */
    public AeronServerHandler(AeronOptions aeronOptions) {
        this.options = aeronOptions;
        this.resources = aeronOptions.resources();
        this.handler = aeronOptions.handler();
        this.dispose.then(doDispose()).doFinally(signalType -> {
            this.onDispose.onComplete();
        }).subscribe((Consumer) null, th -> {
            logger.warn("{} failed on doDispose(): {}", this, th.toString());
        }, () -> {
            logger.debug("Disposed {}", this);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<OnDisposable> start() {
        return Mono.defer(() -> {
            String asString = this.options.inboundUri().asString();
            logger.debug("Starting {} on: {}", this, asString);
            return this.resources.subscription(asString, this.options, this, this::onImageAvailable, this::onImageUnavailable).doOnSuccess(messageSubscription -> {
                this.subscription = messageSubscription;
            }).thenReturn(this).doOnSuccess(aeronServerHandler -> {
                logger.debug("Started {} on: {}", this, asString);
            }).doOnError(th -> {
                logger.error("Failed to start {} on: {}", this, asString);
                dispose();
            });
        });
    }

    public void onFragment(DirectBuffer directBuffer, int i, int i2, Header header) {
        int sessionId = header.sessionId();
        Connection connection = this.connections.get(Integer.valueOf(sessionId));
        if (connection == null) {
            logger.warn("{}: received message but server connection not found (total connections: {})", Integer.toHexString(sessionId), Integer.valueOf(this.connections.size()));
        } else {
            ((DefaultAeronInbound) connection.inbound()).onFragment(directBuffer, i, i2, header);
        }
    }

    private void onImageAvailable(Image image) {
        int sessionId = image.sessionId();
        String asString = this.options.outboundUri().sessionId(Integer.valueOf(sessionId)).asString();
        if (this.connections.containsKey(Integer.valueOf(sessionId))) {
            logger.error("{}: server connection already exists!?", Integer.toHexString(sessionId));
        } else {
            logger.debug("{}: creating server connection: {}", Integer.toHexString(sessionId), asString);
            this.resources.publication(asString, this.options).map(messagePublication -> {
                return new DefaultAeronConnection(sessionId, new DefaultAeronInbound(), new DefaultAeronOutbound(messagePublication), messagePublication);
            }).doOnSuccess(defaultAeronConnection -> {
                setupConnection(sessionId, defaultAeronConnection);
            }).subscribe((Consumer) null, th -> {
                logger.warn("{}: failed to create server outbound, cause: {}", Integer.toHexString(sessionId), th.toString());
            }, () -> {
                logger.debug("{}: created server connection: {}", Integer.toHexString(sessionId), asString);
            });
        }
    }

    private void onImageUnavailable(Image image) {
        int sessionId = image.sessionId();
        Connection remove = this.connections.remove(Integer.valueOf(sessionId));
        if (remove == null) {
            logger.debug("{}: attempt to remove server connection but it wasn't found (total connections: {})", Integer.toHexString(sessionId), Integer.valueOf(this.connections.size()));
            return;
        }
        logger.debug("{}: server inbound became unavailable", Integer.toHexString(sessionId));
        remove.dispose();
        remove.onDispose().doFinally(signalType -> {
            logger.debug("{}: server connection disposed", Integer.toHexString(sessionId));
        }).subscribe((Consumer) null, th -> {
        });
    }

    private void setupConnection(int i, DefaultAeronConnection defaultAeronConnection) {
        this.connections.put(Integer.valueOf(i), defaultAeronConnection);
        defaultAeronConnection.onDispose().doFinally(signalType -> {
            this.connections.remove(Integer.valueOf(i));
        }).subscribe((Consumer) null, th -> {
        });
        if (this.handler == null) {
            logger.warn("{}: handler function is not specified", Integer.toHexString(i));
            return;
        }
        try {
            if (!defaultAeronConnection.isDisposed()) {
                this.handler.apply(defaultAeronConnection).subscribe(defaultAeronConnection.disposeSubscriber());
            }
        } catch (Exception e) {
            logger.error("{}: unexpected exception occurred on handler.apply(), cause: ", Integer.toHexString(i), e);
            defaultAeronConnection.dispose();
            throw Exceptions.propagate(e);
        }
    }

    public void dispose() {
        this.dispose.onComplete();
    }

    public boolean isDisposed() {
        return this.onDispose.isDisposed();
    }

    @Override // reactor.aeron.OnDisposable
    public Mono<Void> onDispose() {
        return this.onDispose;
    }

    private Mono<Void> doDispose() {
        return Mono.defer(() -> {
            logger.debug("Disposing {}", this);
            ArrayList arrayList = new ArrayList();
            arrayList.add(Optional.ofNullable(this.subscription).map(messageSubscription -> {
                messageSubscription.getClass();
                return Mono.fromRunnable(messageSubscription::dispose).then(messageSubscription.onDispose());
            }).orElse(Mono.empty()));
            Stream<R> map = this.connections.values().stream().peek((v0) -> {
                v0.dispose();
            }).map((v0) -> {
                return v0.onDispose();
            });
            arrayList.getClass();
            map.forEach((v1) -> {
                r1.add(v1);
            });
            return Mono.whenDelayError(arrayList).doFinally(signalType -> {
                this.connections.clear();
            });
        });
    }

    public String toString() {
        return "AeronServerHandler0x" + Integer.toHexString(System.identityHashCode(this));
    }
}
