package de.quantummaid.eventmaid.messagefunction;

import de.quantummaid.eventmaid.exceptions.AlreadyClosedException;
import de.quantummaid.eventmaid.identification.CorrelationId;
import de.quantummaid.eventmaid.identification.MessageId;
import de.quantummaid.eventmaid.messagebus.MessageBus;
import de.quantummaid.eventmaid.messagefunction.internal.ExpectedResponseFuture;
import de.quantummaid.eventmaid.messagefunction.internal.SubscriptionContainer;
import de.quantummaid.eventmaid.processingcontext.EventType;
import de.quantummaid.eventmaid.processingcontext.ProcessingContext;
import lombok.Generated;
import lombok.NonNull;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:de/quantummaid/eventmaid/messagefunction/MessageFunctionImpl.class */
public final class MessageFunctionImpl implements MessageFunction {
    private final MessageBus messageBus;
    private volatile boolean closed;

    /* loaded from: input_file:de/quantummaid/eventmaid/messagefunction/MessageFunctionImpl$RequestHandle.class */
    private static final class RequestHandle {
        private final ExpectedResponseFuture responseFuture;
        private final MessageBus messageBus;
        private final SubscriptionContainer subscriptionContainer;
        private volatile boolean alreadyFinishedOrCancelled;

        RequestHandle(MessageBus messageBus) {
            this.messageBus = messageBus;
            this.subscriptionContainer = SubscriptionContainer.subscriptionContainer(messageBus);
            this.responseFuture = ExpectedResponseFuture.expectedResponseFuture(this.subscriptionContainer);
        }

        public synchronized void send(EventType eventType, Object obj) {
            MessageId newUniqueMessageId = MessageId.newUniqueMessageId();
            CorrelationId correlationIdFor = CorrelationId.correlationIdFor(newUniqueMessageId);
            this.subscriptionContainer.setSubscriptionIds(this.messageBus.subscribe(correlationIdFor, this::fulFillFuture), this.messageBus.onException(correlationIdFor, (processingContext, exc) -> {
                fulFillFuture(exc);
            }), this.messageBus.onException(eventType, (processingContext2, exc2) -> {
                if (processingContext2.getPayload() == obj) {
                    fulFillFuture(exc2);
                }
            }));
            try {
                this.messageBus.send(ProcessingContext.processingContext(eventType, newUniqueMessageId, obj));
            } catch (Exception e) {
                fulFillFuture(e);
            }
        }

        private synchronized void fulFillFuture(ProcessingContext<Object> processingContext) {
            if (this.alreadyFinishedOrCancelled) {
                return;
            }
            this.alreadyFinishedOrCancelled = true;
            this.responseFuture.fullFill(processingContext);
        }

        private synchronized void fulFillFuture(Exception exc) {
            if (this.alreadyFinishedOrCancelled) {
                return;
            }
            this.alreadyFinishedOrCancelled = true;
            this.responseFuture.fullFillWithException(exc);
        }

        @Generated
        public ExpectedResponseFuture getResponseFuture() {
            return this.responseFuture;
        }
    }

    private MessageFunctionImpl(@NonNull MessageBus messageBus) {
        if (messageBus == null) {
            throw new NullPointerException("messageBus is marked non-null but is null");
        }
        this.messageBus = messageBus;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static MessageFunctionImpl messageFunction(@NonNull MessageBus messageBus) {
        if (messageBus == null) {
            throw new NullPointerException("messageBus is marked non-null but is null");
        }
        return new MessageFunctionImpl(messageBus);
    }

    @Override // de.quantummaid.eventmaid.messagefunction.MessageFunction
    public ResponseFuture request(EventType eventType, Object obj) {
        if (this.closed) {
            throw new AlreadyClosedException();
        }
        RequestHandle requestHandle = new RequestHandle(this.messageBus);
        requestHandle.send(eventType, obj);
        return requestHandle.getResponseFuture();
    }

    @Override // de.quantummaid.eventmaid.messagefunction.MessageFunction, de.quantummaid.eventmaid.internal.autoclosable.NoErrorAutoClosable, java.lang.AutoCloseable
    public void close() {
        this.closed = true;
    }
}
