package io.streamnative.pulsar.jms.rar;

import com.google.common.collect.ImmutableMap;
import io.streamnative.pulsar.jms.PulsarConnectionFactory;
import io.streamnative.pulsar.jms.PulsarDestination;
import io.streamnative.pulsar.jms.PulsarJMSContext;
import io.streamnative.pulsar.jms.PulsarMessage;
import io.streamnative.pulsar.jms.PulsarQueue;
import io.streamnative.pulsar.jms.PulsarTopic;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import javax.jms.Destination;
import javax.jms.IllegalStateRuntimeException;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Topic;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.resource.spi.ActivationSpec;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamnative/pulsar/jms/rar/PulsarMessageEndpoint.class */
public class PulsarMessageEndpoint implements MessageListener {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(PulsarMessageEndpoint.class);
    private final PulsarConnectionFactory pulsarConnectionFactory;
    private final MessageEndpointFactory messageEndpointFactory;
    private final PulsarActivationSpec activationSpec;
    private final List<JMSContext> sessions = new ArrayList();
    private static final Method ON_MESSAGE;

    /* loaded from: input_file:io/streamnative/pulsar/jms/rar/PulsarMessageEndpoint$TransactionControlHandle.class */
    private static class TransactionControlHandle implements XAResource {

        @Generated
        private static final Logger log = LoggerFactory.getLogger(TransactionControlHandle.class);
        private final PulsarMessage message;
        private boolean started;

        public TransactionControlHandle(PulsarMessage pulsarMessage) {
            this.message = pulsarMessage;
        }

        public void commit(Xid xid, boolean z) throws XAException {
            try {
                if (log.isDebugEnabled()) {
                    log.debug("commit {} onePhase {} ack message {}", new Object[]{xid, Boolean.valueOf(z), this.message});
                }
                this.message.acknowledge();
            } catch (JMSException e) {
                throw new XAException(e + "");
            }
        }

        public void end(Xid xid, int i) throws XAException {
        }

        public void forget(Xid xid) throws XAException {
            throw new XAException("not implemented");
        }

        public int getTransactionTimeout() throws XAException {
            return 0;
        }

        public boolean isSameRM(XAResource xAResource) throws XAException {
            return xAResource.getClass() == getClass();
        }

        public int prepare(Xid xid) throws XAException {
            return 0;
        }

        public Xid[] recover(int i) throws XAException {
            throw new XAException("not implemented");
        }

        public void rollback(Xid xid) throws XAException {
            this.message.negativeAck();
            if (log.isDebugEnabled()) {
                log.debug("rollback {} ack message {}", xid, this.message);
            }
        }

        public boolean setTransactionTimeout(int i) throws XAException {
            return false;
        }

        public void start(Xid xid, int i) throws XAException {
            if (log.isDebugEnabled()) {
                log.debug("start {} flags {}", xid, Integer.valueOf(i));
            }
            this.started = true;
        }
    }

    public PulsarMessageEndpoint(PulsarConnectionFactory pulsarConnectionFactory, MessageEndpointFactory messageEndpointFactory, PulsarActivationSpec pulsarActivationSpec) {
        this.pulsarConnectionFactory = pulsarConnectionFactory;
        this.messageEndpointFactory = messageEndpointFactory;
        this.activationSpec = pulsarActivationSpec;
    }

    public MessageEndpointFactory getMessageEndpointFactory() {
        return this.messageEndpointFactory;
    }

    public PulsarActivationSpec getActivationSpec() {
        return this.activationSpec;
    }

    public PulsarDestination getPulsarDestination(String str, String str2) {
        if (str2 == null || !str2.startsWith("lookup://")) {
            return (str == null || str.toLowerCase().contains("queue")) ? new PulsarQueue(str2) : new PulsarTopic(str2);
        }
        try {
            String substring = str2.substring(9);
            log.info("Lookup Destination from JNDI: '{}'", substring);
            Destination destination = (Destination) new InitialContext(new Hashtable()).lookup(substring);
            log.info("Destination from JNDI: '{}': {}", substring, destination);
            return PulsarConnectionFactory.toPulsarDestination(destination);
        } catch (NamingException | JMSException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public void start() {
        for (int i = 0; i < this.activationSpec.getNumSessions(); i++) {
            startSession();
        }
    }

    void startSession() {
        PulsarJMSContext createContext = this.pulsarConnectionFactory.createContext(2);
        Map<String, Object> buildConsumerConfiguration = this.activationSpec.buildConsumerConfiguration();
        Topic pulsarDestination = getPulsarDestination(this.activationSpec.getDestinationType(), this.activationSpec.getDestination());
        if (!buildConsumerConfiguration.isEmpty()) {
            log.info("Endpoint for {} overrides consumerConfig with {}", pulsarDestination, buildConsumerConfiguration);
            createContext = (PulsarJMSContext) createContext.createContext(createContext.getSessionMode(), ImmutableMap.of("consumerConfig", buildConsumerConfiguration));
        }
        this.sessions.add(createContext);
        if (pulsarDestination.isQueue()) {
            String subscriptionMode = this.activationSpec.getSubscriptionMode();
            boolean z = -1;
            switch (subscriptionMode.hashCode()) {
                case -604381570:
                    if (subscriptionMode.equals("Exclusive")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    throw new IllegalStateRuntimeException("Cannot use Exclusive subscriptionMode on a Queue");
                default:
                    createContext.createConsumer(pulsarDestination).setMessageListener(this);
                    return;
            }
        }
        String subscriptionType = this.activationSpec.getSubscriptionType();
        boolean z2 = -1;
        switch (subscriptionType.hashCode()) {
            case -616379717:
                if (subscriptionType.equals("Durable")) {
                    z2 = true;
                    break;
                }
                break;
            case 907062126:
                if (subscriptionType.equals("NonDurable")) {
                    z2 = false;
                    break;
                }
                break;
        }
        switch (z2) {
            case false:
                String subscriptionMode2 = this.activationSpec.getSubscriptionMode();
                boolean z3 = -1;
                switch (subscriptionMode2.hashCode()) {
                    case -1819699067:
                        if (subscriptionMode2.equals("Shared")) {
                            z3 = true;
                            break;
                        }
                        break;
                    case -604381570:
                        if (subscriptionMode2.equals("Exclusive")) {
                            z3 = false;
                            break;
                        }
                        break;
                }
                switch (z3) {
                    case false:
                        if (this.activationSpec.getNumSessions() > 1) {
                            throw new IllegalStateRuntimeException("numSessions cannot be " + this.activationSpec.getNumSessions() + " on a Exclusive subscription");
                        }
                        createContext.createConsumer(pulsarDestination).setMessageListener(this);
                        return;
                    case true:
                    default:
                        createContext.createSharedConsumer(pulsarDestination, this.activationSpec.getSubscriptionName()).setMessageListener(this);
                        return;
                }
            case true:
            default:
                String subscriptionMode3 = this.activationSpec.getSubscriptionMode();
                boolean z4 = -1;
                switch (subscriptionMode3.hashCode()) {
                    case -1819699067:
                        if (subscriptionMode3.equals("Shared")) {
                            z4 = true;
                            break;
                        }
                        break;
                    case -604381570:
                        if (subscriptionMode3.equals("Exclusive")) {
                            z4 = false;
                            break;
                        }
                        break;
                }
                switch (z4) {
                    case false:
                        if (this.activationSpec.getNumSessions() > 1) {
                            throw new IllegalStateRuntimeException("numSessions cannot be " + this.activationSpec.getNumSessions() + " on a Exclusive subscription");
                        }
                        createContext.createDurableConsumer(pulsarDestination, this.activationSpec.getSubscriptionName()).setMessageListener(this);
                        return;
                    case true:
                    default:
                        createContext.createSharedDurableConsumer(pulsarDestination, this.activationSpec.getSubscriptionName()).setMessageListener(this);
                        return;
                }
        }
    }

    public void stop() {
        this.sessions.forEach(jMSContext -> {
            jMSContext.close();
        });
    }

    public void onMessage(Message message) {
        PulsarMessage pulsarMessage = (PulsarMessage) message;
        TransactionControlHandle transactionControlHandle = new TransactionControlHandle(pulsarMessage);
        try {
            MessageListener createEndpoint = this.messageEndpointFactory.createEndpoint(transactionControlHandle);
            boolean z = false;
            try {
                try {
                    MessageListener messageListener = createEndpoint;
                    createEndpoint.beforeDelivery(ON_MESSAGE);
                    try {
                        messageListener.onMessage(message);
                        z = true;
                        createEndpoint.afterDelivery();
                        if (!transactionControlHandle.started) {
                            if (1 != 0) {
                                pulsarMessage.acknowledge();
                            } else {
                                pulsarMessage.negativeAck();
                            }
                        }
                    } catch (Throwable th) {
                        createEndpoint.afterDelivery();
                        if (!transactionControlHandle.started) {
                            if (z) {
                                pulsarMessage.acknowledge();
                            } else {
                                pulsarMessage.negativeAck();
                            }
                        }
                        throw th;
                    }
                } finally {
                    createEndpoint.release();
                }
            } catch (Throwable th2) {
                log.error("Cannot deliver message " + message + " to endpoint " + createEndpoint);
                throw new RuntimeException(th2);
            }
        } catch (Exception e) {
            log.error("Cannot deliver message " + message + " - cannot create endpoint", e);
            throw new RuntimeException(e);
        }
    }

    public boolean matches(MessageEndpointFactory messageEndpointFactory, ActivationSpec activationSpec) {
        return this.messageEndpointFactory == messageEndpointFactory && this.activationSpec == activationSpec;
    }

    static {
        try {
            ON_MESSAGE = MessageListener.class.getMethod("onMessage", Message.class);
        } catch (Throwable th) {
            throw new RuntimeException(th);
        }
    }
}
