package reactor.aeron;

import io.aeron.Aeron;
import io.aeron.ConcurrentPublication;
import io.aeron.FragmentAssembler;
import io.aeron.Image;
import io.aeron.Subscription;
import io.aeron.driver.MediaDriver;
import io.aeron.logbuffer.FragmentHandler;
import java.io.File;
import java.util.Optional;
import java.util.function.Consumer;
import org.agrona.CloseHelper;
import org.agrona.IoUtil;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.util.Logger;
import reactor.util.Loggers;

/* loaded from: input_file:reactor/aeron/AeronResources.class */
public class AeronResources implements OnDisposable {
    private static final Logger logger = Loggers.getLogger(AeronResources.class);
    private final AeronResourcesConfig config;
    private final MonoProcessor<Void> start = MonoProcessor.create();
    private final MonoProcessor<Void> dispose = MonoProcessor.create();
    private final MonoProcessor<Void> onDispose = MonoProcessor.create();
    private Aeron aeron;
    private MediaDriver mediaDriver;
    private AeronEventLoop eventLoop;

    private AeronResources(AeronResourcesConfig aeronResourcesConfig) {
        this.config = aeronResourcesConfig;
        this.start.doOnTerminate(this::doStart).subscribe(r8 -> {
            logger.info("{} has started", new Object[]{this});
        }, th -> {
            logger.error("Start of {} failed with error: {}", new Object[]{this, th});
            dispose();
        });
        this.dispose.then(doDispose()).doFinally(signalType -> {
            this.onDispose.onComplete();
        }).subscribe(r82 -> {
            logger.info("{} closed", new Object[]{this});
        }, th2 -> {
            logger.warn("{} closed with error: {}", new Object[]{this, th2});
        });
    }

    public static AeronResources start() {
        return start(AeronResourcesConfig.defaultConfig());
    }

    public static AeronResources start(AeronResourcesConfig aeronResourcesConfig) {
        AeronResources aeronResources = new AeronResources(aeronResourcesConfig);
        aeronResources.start0();
        return aeronResources;
    }

    private void start0() {
        if (isDisposed()) {
            return;
        }
        this.start.onComplete();
    }

    private void doStart() {
        this.mediaDriver = MediaDriver.launchEmbedded(new MediaDriver.Context().mtuLength(this.config.mtuLength()).imageLivenessTimeoutNs(this.config.imageLivenessTimeout().toNanos()).dirDeleteOnStart(this.config.isDirDeleteOnStart()));
        Aeron.Context context = new Aeron.Context();
        String aeronDirectoryName = this.mediaDriver.aeronDirectoryName();
        context.aeronDirectoryName(aeronDirectoryName);
        this.aeron = Aeron.connect(context);
        this.eventLoop = new AeronEventLoop(this.config.idleStrategySupplier().get());
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            deleteAeronDirectory(context);
        }));
        logger.info("{} has initialized embedded media mediaDriver, aeron directory: {}", new Object[]{this, aeronDirectoryName});
    }

    public AeronEventLoop nextEventLoop() {
        return this.eventLoop;
    }

    public Mono<MessagePublication> messagePublication(String str, String str2, int i, AeronOptions aeronOptions, AeronEventLoop aeronEventLoop) {
        ConcurrentPublication addPublication = this.aeron.addPublication(str2, i);
        MessagePublication messagePublication = new MessagePublication(str, this.config.mtuLength(), addPublication, aeronOptions, aeronEventLoop);
        return aeronEventLoop.register(messagePublication).doOnError(th -> {
            logger.error("[{}] Failed to register publication {} on eventLoop {}, cause: {}", new Object[]{str, AeronUtils.format(addPublication), aeronEventLoop, th});
            if (addPublication.isClosed()) {
                return;
            }
            addPublication.close();
        }).doOnSuccess(r9 -> {
            logger.debug("[{}] Added publication: {}", new Object[]{str, messagePublication});
        }).thenReturn(messagePublication);
    }

    public Mono<InnerPoller> controlSubscription(String str, String str2, int i, ControlMessageSubscriber controlMessageSubscriber, AeronEventLoop aeronEventLoop, Consumer<Image> consumer, Consumer<Image> consumer2) {
        return messageSubscription(str + "-control", str2, i, new ControlFragmentHandler(controlMessageSubscriber), aeronEventLoop, consumer, consumer2);
    }

    public Mono<InnerPoller> dataSubscription(String str, String str2, int i, DataMessageSubscriber dataMessageSubscriber, AeronEventLoop aeronEventLoop, Consumer<Image> consumer, Consumer<Image> consumer2) {
        return messageSubscription(str + "-data", str2, i, new DataFragmentHandler(dataMessageSubscriber), aeronEventLoop, consumer, consumer2);
    }

    public void dispose() {
        if (isDisposed()) {
            return;
        }
        this.dispose.onComplete();
    }

    public boolean isDisposed() {
        return this.onDispose.isDisposed();
    }

    @Override // reactor.aeron.OnDisposable
    public Mono<Void> onDispose() {
        return this.onDispose;
    }

    private Mono<Void> doDispose() {
        return Mono.defer(() -> {
            logger.info("{} shutdown initiated", new Object[]{this});
            this.eventLoop.dispose();
            return this.eventLoop.onDispose().doFinally(signalType -> {
                CloseHelper.quietClose(this.aeron);
                CloseHelper.quietClose(this.mediaDriver);
                Optional.ofNullable(this.mediaDriver).map((v0) -> {
                    return v0.context();
                }).ifPresent(context -> {
                    IoUtil.delete(context.aeronDirectory(), true);
                });
                logger.info("{} shutdown complete", new Object[]{this});
            });
        });
    }

    private Mono<InnerPoller> messageSubscription(String str, String str2, int i, FragmentHandler fragmentHandler, AeronEventLoop aeronEventLoop, Consumer<Image> consumer, Consumer<Image> consumer2) {
        Subscription addSubscription = this.aeron.addSubscription(str2, i, image -> {
            if (logger.isDebugEnabled()) {
                logger.debug("[{}] {} available image, imageSessionId={}, imageSource={}", new Object[]{str, AeronUtils.format(str2, i), Integer.valueOf(image.sessionId()), image.sourceIdentity()});
            }
            if (consumer != null) {
                consumer.accept(image);
            }
        }, image2 -> {
            if (logger.isDebugEnabled()) {
                logger.debug("[{}] {} unavailable image, imageSessionId={}, imageSource={}", new Object[]{str, AeronUtils.format(str2, i), Integer.valueOf(image2.sessionId()), image2.sourceIdentity()});
            }
            if (consumer2 != null) {
                consumer2.accept(image2);
            }
        });
        InnerPoller innerPoller = new InnerPoller(aeronEventLoop, addSubscription, new FragmentAssembler(fragmentHandler));
        return aeronEventLoop.register(innerPoller).doOnError(th -> {
            logger.error("[{}] Failed to register subscription {} on eventLoop {}, cause: {}", new Object[]{str, AeronUtils.format(addSubscription), aeronEventLoop, th});
            if (addSubscription.isClosed()) {
                return;
            }
            addSubscription.close();
        }).doOnSuccess(r9 -> {
            logger.debug("[{}] Added subscription: {}", new Object[]{str, AeronUtils.format(addSubscription)});
        }).thenReturn(innerPoller);
    }

    private void deleteAeronDirectory(Aeron.Context context) {
        File aeronDirectory = context.aeronDirectory();
        if (aeronDirectory.exists()) {
            IoUtil.delete(aeronDirectory, true);
        }
    }

    public String toString() {
        return "AeronResources@" + Integer.toHexString(System.identityHashCode(this));
    }
}
