package org.codeartisans.java.sos.messagebus;

import java.util.Iterator;
import org.codeartisans.java.sos.SOSFailure;
import org.codeartisans.java.sos.threading.WorkQueueComposite;
import org.qi4j.api.injection.scope.Service;
import org.qi4j.api.injection.scope.Structure;
import org.qi4j.api.unitofwork.UnitOfWork;
import org.qi4j.api.unitofwork.UnitOfWorkCompletionException;
import org.qi4j.api.unitofwork.UnitOfWorkFactory;

/* loaded from: input_file:org/codeartisans/java/sos/messagebus/SingleThreadDeliveryMixin.class */
public abstract class SingleThreadDeliveryMixin extends BaseMessageBus implements MessageBusComposite {

    @Structure
    private UnitOfWorkFactory uowf;

    @Service
    private WorkQueueComposite workQueue;

    @Override // org.codeartisans.java.sos.messagebus.BaseMessageBus, org.codeartisans.java.sos.messagebus.MessageBus
    public <S extends Subscriber> void publish(final Message<S> message) {
        this.workQueue.enqueue(new Runnable() { // from class: org.codeartisans.java.sos.messagebus.SingleThreadDeliveryMixin.1
            @Override // java.lang.Runnable
            public void run() {
                if (SingleThreadDeliveryMixin.this.vetoed(message)) {
                    return;
                }
                UnitOfWork newUnitOfWork = SingleThreadDeliveryMixin.this.uowf.newUnitOfWork();
                Iterator it = SingleThreadDeliveryMixin.this.subscribers(message.getMessageType()).iterator();
                while (it.hasNext()) {
                    try {
                        message.deliver((Subscriber) it.next());
                        newUnitOfWork.complete();
                    } catch (UnitOfWorkCompletionException e) {
                        newUnitOfWork.discard();
                        throw new SOSFailure("Error during message delivery", e);
                    }
                }
            }
        });
    }

    @Override // org.codeartisans.java.sos.messagebus.MessageBus
    public <S extends Subscriber> void publish(final Message<S> message, final DeliveryCallback deliveryCallback) {
        this.workQueue.enqueue(new Runnable() { // from class: org.codeartisans.java.sos.messagebus.SingleThreadDeliveryMixin.2
            @Override // java.lang.Runnable
            public void run() {
                boolean z = false;
                if (!SingleThreadDeliveryMixin.this.vetoed(message)) {
                    UnitOfWork newUnitOfWork = SingleThreadDeliveryMixin.this.uowf.newUnitOfWork();
                    Iterator it = SingleThreadDeliveryMixin.this.subscribers(message.getMessageType()).iterator();
                    while (it.hasNext()) {
                        try {
                            try {
                                message.deliver((Subscriber) it.next());
                            } catch (UnitOfWorkCompletionException e) {
                                newUnitOfWork.discard();
                                throw new SOSFailure("Error during message delivery", e);
                            }
                        } catch (DeliveryRefusalException e2) {
                            z = true;
                        }
                        newUnitOfWork.complete();
                    }
                }
                if (deliveryCallback != null) {
                    deliveryCallback.afterDelivery(z);
                }
            }
        });
    }
}
