package reactor.aeron;

import io.aeron.Aeron;
import io.aeron.ExclusivePublication;
import io.aeron.Image;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.driver.Configuration;
import io.aeron.driver.MediaDriver;
import java.io.File;
import java.time.Duration;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import org.agrona.CloseHelper;
import org.agrona.IoUtil;
import org.agrona.concurrent.BackoffIdleStrategy;
import org.agrona.concurrent.IdleStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:reactor/aeron/AeronResources.class */
public final class AeronResources implements OnDisposable {
    private static final Logger logger = LoggerFactory.getLogger(AeronResources.class);
    private int numOfWorkers;
    private Supplier<IdleStrategy> workerIdleStrategySupplier;
    private Aeron.Context aeronContext;
    private MediaDriver.Context mediaContext;
    private Aeron aeron;
    private MediaDriver mediaDriver;
    private AeronEventLoopGroup eventLoopGroup;
    private final MonoProcessor<Void> start;
    private final MonoProcessor<Void> onStart;
    private final MonoProcessor<Void> dispose;
    private final MonoProcessor<Void> onDispose;

    public AeronResources() {
        this.numOfWorkers = Runtime.getRuntime().availableProcessors();
        this.workerIdleStrategySupplier = AeronResources::defaultBackoffIdleStrategy;
        this.aeronContext = new Aeron.Context().errorHandler(th -> {
            logger.warn("Aeron exception occurred: " + th, th);
        });
        this.mediaContext = new MediaDriver.Context().errorHandler(th2 -> {
            logger.warn("Exception occurred on MediaDriver: " + th2, th2);
        }).warnIfDirectoryExists(true).dirDeleteOnStart(true).publicationReservedSessionIdLow(0).publicationReservedSessionIdHigh(Integer.MAX_VALUE);
        this.start = MonoProcessor.create();
        this.onStart = MonoProcessor.create();
        this.dispose = MonoProcessor.create();
        this.onDispose = MonoProcessor.create();
        Mono doOnSuccess = this.start.then(doStart()).doOnSuccess(r3 -> {
            this.onStart.onComplete();
        });
        MonoProcessor<Void> monoProcessor = this.onStart;
        monoProcessor.getClass();
        doOnSuccess.doOnError(monoProcessor::onError).subscribe((Consumer) null, th3 -> {
            logger.error("{} failed to start, cause: {}", this, th3.toString());
            dispose();
        });
        this.dispose.then(doDispose()).doFinally(signalType -> {
            this.onDispose.onComplete();
        }).subscribe((Consumer) null, th4 -> {
            logger.warn("{} failed on doDispose(): {}", this, th4.toString());
        }, () -> {
            logger.debug("Disposed {}", this);
        });
    }

    private AeronResources(Aeron.Context context, MediaDriver.Context context2) {
        this();
        copy(context);
        copy(context2);
    }

    private AeronResources copy() {
        return new AeronResources(this.aeronContext, this.mediaContext);
    }

    private void copy(MediaDriver.Context context) {
        this.mediaContext.aeronDirectoryName(context.aeronDirectoryName()).dirDeleteOnStart(context.dirDeleteOnStart()).imageLivenessTimeoutNs(context.imageLivenessTimeoutNs()).mtuLength(context.mtuLength()).driverTimeoutMs(context.driverTimeoutMs()).errorHandler(context.errorHandler()).threadingMode(context.threadingMode()).applicationSpecificFeedback(context.applicationSpecificFeedback()).cachedEpochClock(context.cachedEpochClock()).cachedNanoClock(context.cachedNanoClock()).clientLivenessTimeoutNs(context.clientLivenessTimeoutNs()).conductorIdleStrategy(context.conductorIdleStrategy()).conductorThreadFactory(context.conductorThreadFactory()).congestControlSupplier(context.congestionControlSupplier()).counterFreeToReuseTimeoutNs(context.counterFreeToReuseTimeoutNs()).countersManager(context.countersManager()).countersMetaDataBuffer(context.countersMetaDataBuffer()).countersValuesBuffer(context.countersValuesBuffer()).epochClock(context.epochClock()).warnIfDirectoryExists(context.warnIfDirectoryExists()).useWindowsHighResTimer(context.useWindowsHighResTimer()).useConcurrentCountersManager(context.useConcurrentCountersManager()).unicastFlowControlSupplier(context.unicastFlowControlSupplier()).multicastFlowControlSupplier(context.multicastFlowControlSupplier()).timerIntervalNs(context.timerIntervalNs()).termBufferSparseFile(context.termBufferSparseFile()).tempBuffer(context.tempBuffer()).systemCounters(context.systemCounters()).statusMessageTimeoutNs(context.statusMessageTimeoutNs()).spiesSimulateConnection(context.spiesSimulateConnection()).sharedThreadFactory(context.sharedThreadFactory()).sharedNetworkThreadFactory(context.sharedNetworkThreadFactory()).sharedNetworkIdleStrategy(context.sharedNetworkIdleStrategy()).sharedIdleStrategy(context.sharedIdleStrategy()).senderThreadFactory(context.senderThreadFactory()).senderIdleStrategy(context.senderIdleStrategy()).sendChannelEndpointSupplier(context.sendChannelEndpointSupplier()).receiverThreadFactory(context.receiverThreadFactory()).receiverIdleStrategy(context.receiverIdleStrategy()).receiveChannelEndpointThreadLocals(context.receiveChannelEndpointThreadLocals()).receiveChannelEndpointSupplier(context.receiveChannelEndpointSupplier()).publicationUnblockTimeoutNs(context.publicationUnblockTimeoutNs()).publicationTermBufferLength(context.publicationTermBufferLength()).publicationReservedSessionIdLow(context.publicationReservedSessionIdLow()).publicationReservedSessionIdHigh(context.publicationReservedSessionIdHigh()).publicationLingerTimeoutNs(context.publicationLingerTimeoutNs()).publicationConnectionTimeoutNs(context.publicationConnectionTimeoutNs()).performStorageChecks(context.performStorageChecks()).nanoClock(context.nanoClock()).lossReport(context.lossReport()).ipcTermBufferLength(context.ipcTermBufferLength()).ipcMtuLength(context.ipcMtuLength()).initialWindowLength(context.initialWindowLength()).filePageSize(context.filePageSize()).errorLog(context.errorLog());
    }

    private void copy(Aeron.Context context) {
        this.aeronContext.resourceLingerDurationNs(context.resourceLingerDurationNs()).keepAliveInterval(context.keepAliveInterval()).errorHandler(context.errorHandler()).driverTimeoutMs(context.driverTimeoutMs()).availableImageHandler(context.availableImageHandler()).unavailableImageHandler(context.unavailableImageHandler()).idleStrategy(context.idleStrategy()).aeronDirectoryName(context.aeronDirectoryName()).availableCounterHandler(context.availableCounterHandler()).unavailableCounterHandler(context.unavailableCounterHandler()).useConductorAgentInvoker(context.useConductorAgentInvoker()).threadFactory(context.threadFactory()).epochClock(context.epochClock()).clientLock(context.clientLock()).nanoClock(context.nanoClock());
    }

    private static BackoffIdleStrategy defaultBackoffIdleStrategy() {
        return new BackoffIdleStrategy(10L, 20L, 1000L, Configuration.IDLE_MAX_PARK_NS);
    }

    private static String generateRandomTmpDirName() {
        return IoUtil.tmpDirName() + "aeron-" + System.getProperty("user.name", "default") + '-' + UUID.randomUUID().toString();
    }

    public AeronResources aeron(UnaryOperator<Aeron.Context> unaryOperator) {
        AeronResources copy = copy();
        return new AeronResources((Aeron.Context) unaryOperator.apply(copy.aeronContext), copy.mediaContext);
    }

    public AeronResources media(UnaryOperator<MediaDriver.Context> unaryOperator) {
        AeronResources copy = copy();
        return new AeronResources(copy.aeronContext, (MediaDriver.Context) unaryOperator.apply(copy.mediaContext));
    }

    public AeronResources useTmpDir() {
        return media(context -> {
            return context.aeronDirectoryName(generateRandomTmpDirName());
        });
    }

    public AeronResources singleWorker() {
        return numOfWorkers(1);
    }

    public AeronResources numOfWorkers(int i) {
        AeronResources copy = copy();
        copy.numOfWorkers = i;
        return copy;
    }

    public AeronResources workerIdleStrategySupplier(Supplier<IdleStrategy> supplier) {
        AeronResources copy = copy();
        copy.workerIdleStrategySupplier = supplier;
        return copy;
    }

    public Mono<AeronResources> start() {
        return Mono.defer(() -> {
            this.start.onComplete();
            return this.onStart.thenReturn(this);
        });
    }

    private Mono<Void> doStart() {
        return Mono.fromRunnable(() -> {
            this.mediaDriver = MediaDriver.launchEmbedded(this.mediaContext);
            this.aeronContext.aeronDirectoryName(this.mediaDriver.aeronDirectoryName());
            this.aeron = Aeron.connect(this.aeronContext);
            this.eventLoopGroup = new AeronEventLoopGroup("reactor-aeron", this.numOfWorkers, this.workerIdleStrategySupplier);
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                deleteAeronDirectory(this.aeronContext.aeronDirectory());
            }));
            logger.debug("{} has initialized embedded media driver, aeron directory: {}", this, this.aeronContext.aeronDirectoryName());
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<DefaultAeronInbound> inbound(Image image, MessageSubscription messageSubscription) {
        return Mono.defer(() -> {
            AeronEventLoop next = this.eventLoopGroup.next();
            return next.registerInbound(new DefaultAeronInbound(image, next, messageSubscription)).doOnError(th -> {
                logger.error("{} failed on registerInbound(), cause: {}", this, th.toString());
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<MessagePublication> publication(String str, int i, AeronOptions aeronOptions) {
        return Mono.defer(() -> {
            return aeronPublication(str, i).subscribeOn(Schedulers.parallel()).doOnError(th -> {
                logger.error("{} failed on aeronPublication(), channel: {}, cause: {}", new Object[]{this, str, th.toString()});
            }).flatMap(publication -> {
                AeronEventLoop next = this.eventLoopGroup.next();
                return next.registerPublication(new MessagePublication(publication, aeronOptions, next)).doOnError(th2 -> {
                    logger.error("{} failed on registerPublication(), cause: {}", this, th2.toString());
                    if (publication.isClosed()) {
                        return;
                    }
                    publication.close();
                });
            });
        });
    }

    private Mono<Publication> aeronPublication(String str, int i) {
        return Mono.fromCallable(() -> {
            logger.debug("Adding aeron.Publication for channel {}", str);
            long nanoTime = System.nanoTime();
            ExclusivePublication addExclusivePublication = this.aeron.addExclusivePublication(str, i);
            logger.debug("Added aeron.Publication for channel {}, spent: {} ns", str, Long.valueOf(Duration.ofNanos(System.nanoTime() - nanoTime).toNanos()));
            return addExclusivePublication;
        });
    }

    public void dispose() {
        this.dispose.onComplete();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<MessageSubscription> subscription(String str, int i, Consumer<Image> consumer, Consumer<Image> consumer2) {
        return Mono.defer(() -> {
            return aeronSubscription(str, i, consumer, consumer2).subscribeOn(Schedulers.parallel()).doOnError(th -> {
                logger.error("{} failed on aeronSubscription(), channel: {}, cause: {}", new Object[]{this, str, th.toString()});
            }).flatMap(subscription -> {
                AeronEventLoop next = this.eventLoopGroup.next();
                return next.registerSubscription(new MessageSubscription(subscription, next)).doOnError(th2 -> {
                    logger.error("{} failed on registerSubscription(), cause: {}", this, th2.toString());
                    if (subscription.isClosed()) {
                        return;
                    }
                    subscription.close();
                });
            });
        });
    }

    private Mono<Subscription> aeronSubscription(String str, int i, Consumer<Image> consumer, Consumer<Image> consumer2) {
        return Mono.fromCallable(() -> {
            logger.debug("Adding aeron.Subscription for channel {}", str);
            long nanoTime = System.nanoTime();
            Subscription addSubscription = this.aeron.addSubscription(str, i, image -> {
                logger.debug("{} onImageAvailable: {} {}", new Object[]{this, Integer.toHexString(image.sessionId()), image.sourceIdentity()});
                Optional.ofNullable(consumer).ifPresent(consumer3 -> {
                    consumer3.accept(image);
                });
            }, image2 -> {
                logger.debug("{} onImageUnavailable: {} {}", new Object[]{this, Integer.toHexString(image2.sessionId()), image2.sourceIdentity()});
                Optional.ofNullable(consumer2).ifPresent(consumer3 -> {
                    consumer3.accept(image2);
                });
            });
            logger.debug("Added aeron.Subscription for channel {}, spent: {} ns", str, Long.valueOf(Duration.ofNanos(System.nanoTime() - nanoTime).toNanos()));
            return addSubscription;
        });
    }

    public boolean isDisposed() {
        return this.onDispose.isDisposed();
    }

    @Override // reactor.aeron.OnDisposable
    public Mono<Void> onDispose() {
        return this.onDispose;
    }

    private Mono<Void> doDispose() {
        return Mono.defer(() -> {
            logger.debug("Disposing {}", this);
            AeronEventLoopGroup aeronEventLoopGroup = this.eventLoopGroup;
            aeronEventLoopGroup.getClass();
            return Mono.fromRunnable(aeronEventLoopGroup::dispose).then(this.eventLoopGroup.onDispose()).doFinally(signalType -> {
                CloseHelper.quietClose(this.aeron);
                CloseHelper.quietClose(this.mediaDriver);
                Optional.ofNullable(this.aeronContext).ifPresent(context -> {
                    IoUtil.delete(context.aeronDirectory(), true);
                });
            });
        });
    }

    private void deleteAeronDirectory(File file) {
        if (file.exists()) {
            IoUtil.delete(file, true);
            logger.debug("{} deleted aeron directory {}", this, file);
        }
    }

    public String toString() {
        return "AeronResources" + Integer.toHexString(System.identityHashCode(this));
    }
}
