package reactor.aeron;

import io.aeron.Aeron;
import io.aeron.ConcurrentPublication;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.driver.MediaDriver;
import java.util.Optional;
import org.agrona.CloseHelper;
import reactor.core.Disposable;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.Logger;
import reactor.util.Loggers;

/* loaded from: input_file:reactor/aeron/AeronResources.class */
public class AeronResources implements Disposable, AutoCloseable {
    private static final Logger logger = Loggers.getLogger(AeronResources.class);
    private final AeronResourcesConfig config;
    private final MonoProcessor<Void> onStart = MonoProcessor.create();
    private final MonoProcessor<Void> onClose = MonoProcessor.create();
    private Aeron aeron;
    private MediaDriver mediaDriver;
    private Poller poller;
    private Scheduler sender;
    private Scheduler receiver;

    private AeronResources(AeronResourcesConfig aeronResourcesConfig) {
        this.config = aeronResourcesConfig;
        this.onStart.doOnTerminate(this::onStart).subscribe(r8 -> {
            logger.info("{} has started", new Object[]{this});
        }, th -> {
            logger.error("Start of {} failed with error: {}", new Object[]{this, th});
            dispose();
        });
        this.onClose.doOnTerminate(this::onClose).subscribe(r82 -> {
            logger.info("{} has stopped", new Object[]{this});
        }, th2 -> {
            logger.warn("{} disposed with error: {}", new Object[]{this, th2});
        });
    }

    public static AeronResources start() {
        return start(AeronResourcesConfig.defaultConfig());
    }

    public static AeronResources start(AeronResourcesConfig aeronResourcesConfig) {
        AeronResources aeronResources = new AeronResources(aeronResourcesConfig);
        aeronResources.start0();
        return aeronResources;
    }

    private void start0() {
        if (isDisposed()) {
            return;
        }
        this.onStart.onComplete();
    }

    private void onStart() {
        MediaDriver.Context context = new MediaDriver.Context();
        context.dirDeleteOnStart(this.config.isDirDeleteOnStart());
        this.mediaDriver = MediaDriver.launchEmbedded(context);
        Aeron.Context context2 = new Aeron.Context();
        String aeronDirectoryName = this.mediaDriver.aeronDirectoryName();
        context2.aeronDirectoryName(aeronDirectoryName);
        this.aeron = Aeron.connect(context2);
        this.sender = Schedulers.newSingle("reactor-aeron-sender");
        this.receiver = Schedulers.newSingle("reactor-aeron-receiver");
        Scheduler scheduler = this.receiver;
        Poller poller = new Poller(() -> {
            return Boolean.valueOf(!this.receiver.isDisposed());
        });
        this.poller = poller;
        scheduler.schedule(poller);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                context2.deleteAeronDirectory();
            } catch (Throwable th) {
                logger.warn("Exception occurred at deleting aeron directory: {}, cause: {}", new Object[]{aeronDirectoryName, th});
            }
        }));
        logger.info("{} has initialized embedded media mediaDriver, aeron directory: {}", new Object[]{this, aeronDirectoryName});
    }

    public Publication publication(String str, String str2, int i, String str3, long j) {
        ConcurrentPublication addPublication = this.aeron.addPublication(str2, i);
        if (logger.isDebugEnabled()) {
            logger.debug("[{}] Added publication, sessionId={} {} {}", new Object[]{str, Long.valueOf(j), str3, AeronUtils.format(str2, i)});
        }
        return addPublication;
    }

    public Subscription controlSubscription(String str, String str2, int i, String str3, long j, ControlMessageSubscriber controlMessageSubscriber) {
        Subscription addSubscription = addSubscription(str, str2, i, str3, j);
        this.poller.addControlSubscription(addSubscription, controlMessageSubscriber);
        return addSubscription;
    }

    public Subscription dataSubscription(String str, String str2, int i, String str3, long j, DataMessageSubscriber dataMessageSubscriber) {
        Subscription addSubscription = addSubscription(str, str2, i, str3, j);
        this.poller.addDataSubscription(addSubscription, dataMessageSubscriber);
        return addSubscription;
    }

    public void close(Subscription subscription) {
        if (subscription != null) {
            this.poller.removeSubscription(subscription);
            CloseHelper.quietClose(subscription);
        }
    }

    public void close(Publication publication) {
        CloseHelper.quietClose(publication);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        dispose();
    }

    public void dispose() {
        if (isDisposed()) {
            return;
        }
        this.onClose.onComplete();
    }

    private void onClose() {
        logger.info("{} shutdown initiated", new Object[]{this});
        Optional.ofNullable(this.sender).filter(scheduler -> {
            return !scheduler.isDisposed();
        }).ifPresent((v0) -> {
            v0.dispose();
        });
        Optional.ofNullable(this.receiver).filter(scheduler2 -> {
            return !scheduler2.isDisposed();
        }).ifPresent((v0) -> {
            v0.dispose();
        });
        CloseHelper.quietClose(this.aeron);
        CloseHelper.quietClose(this.mediaDriver);
        Optional.ofNullable(this.mediaDriver).map((v0) -> {
            return v0.context();
        }).ifPresent((v0) -> {
            v0.deleteAeronDirectory();
        });
        logger.info("{} shutdown complete", new Object[]{this});
    }

    public boolean isDisposed() {
        return this.onClose.isDisposed();
    }

    public AeronWriteSequencer writeSequencer(String str, MessagePublication messagePublication, long j) {
        AeronWriteSequencer aeronWriteSequencer = new AeronWriteSequencer(this.sender, str, messagePublication, j);
        if (logger.isDebugEnabled()) {
            logger.debug("[{}] Created {}, sessionId={}", new Object[]{str, aeronWriteSequencer, Long.valueOf(j)});
        }
        return aeronWriteSequencer;
    }

    Subscription addSubscription(String str, String str2, int i, String str3, long j) {
        Subscription addSubscription = this.aeron.addSubscription(str2, i);
        if (logger.isDebugEnabled()) {
            logger.debug("[{}] Added subscription, sessionId={} {} {}", new Object[]{str, Long.valueOf(j), str3, AeronUtils.format(str2, i)});
        }
        return addSubscription;
    }

    public String toString() {
        return "AeronResources@" + Integer.toHexString(System.identityHashCode(this));
    }
}
