package io.quarkiverse.hivemqclient.smallrye.reactive;

import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import com.hivemq.client.rx.FlowableWithSingle;
import io.quarkiverse.hivemqclient.smallrye.reactive.HiveMQClients;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.groups.MultiOnFailure;
import io.smallrye.reactive.messaging.mqtt.MqttFailStop;
import io.smallrye.reactive.messaging.mqtt.MqttFailureHandler;
import io.smallrye.reactive.messaging.mqtt.MqttIgnoreFailure;
import io.smallrye.reactive.messaging.mqtt.i18n.MqttExceptions;
import io.smallrye.reactive.messaging.mqtt.i18n.MqttLogging;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import mutiny.zero.flow.adapters.AdaptersToFlow;

/* loaded from: input_file:io/quarkiverse/hivemqclient/smallrye/reactive/HiveMQMqttSource.class */
public class HiveMQMqttSource {
    private final Flow.Publisher<HiveMQReceivingMqttMessage> source;
    private final AtomicBoolean subscribed = new AtomicBoolean();
    private final Pattern pattern;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.quarkiverse.hivemqclient.smallrye.reactive.HiveMQMqttSource$1, reason: invalid class name */
    /* loaded from: input_file:io/quarkiverse/hivemqclient/smallrye/reactive/HiveMQMqttSource$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$smallrye$reactive$messaging$mqtt$MqttFailureHandler$Strategy = new int[MqttFailureHandler.Strategy.values().length];

        static {
            try {
                $SwitchMap$io$smallrye$reactive$messaging$mqtt$MqttFailureHandler$Strategy[MqttFailureHandler.Strategy.IGNORE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$smallrye$reactive$messaging$mqtt$MqttFailureHandler$Strategy[MqttFailureHandler.Strategy.FAIL.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public HiveMQMqttSource(HiveMQMqttConnectorIncomingConfiguration hiveMQMqttConnectorIncomingConfiguration) {
        Optional<String> topic = hiveMQMqttConnectorIncomingConfiguration.getTopic();
        Objects.requireNonNull(hiveMQMqttConnectorIncomingConfiguration);
        String orElseGet = topic.orElseGet(hiveMQMqttConnectorIncomingConfiguration::getChannel);
        int intValue = hiveMQMqttConnectorIncomingConfiguration.getQos().intValue();
        boolean booleanValue = hiveMQMqttConnectorIncomingConfiguration.getBroadcast().booleanValue();
        MqttFailureHandler createFailureHandler = createFailureHandler(MqttFailureHandler.Strategy.from(hiveMQMqttConnectorIncomingConfiguration.getFailureStrategy()), hiveMQMqttConnectorIncomingConfiguration.getChannel());
        this.pattern = createPatternFromTopic(orElseGet);
        HiveMQClients.ClientHolder holder = HiveMQClients.getHolder(hiveMQMqttConnectorIncomingConfiguration);
        HiveMQPing.isServerReachable(HiveMQClients.getHolder(hiveMQMqttConnectorIncomingConfiguration));
        this.source = createMqttSource(holder, orElseGet, intValue, booleanValue, createFailureHandler);
    }

    private Pattern createPatternFromTopic(String str) {
        if (str.contains("#") || str.contains("+")) {
            return Pattern.compile(str.replace("+", "[^/]+").replace("#", ".+"));
        }
        return null;
    }

    private boolean matches(String str, Mqtt3Publish mqtt3Publish) {
        String obj = mqtt3Publish.getTopic().toString();
        return this.pattern != null ? this.pattern.matcher(obj).matches() : obj.equals(str);
    }

    private MqttFailureHandler createFailureHandler(MqttFailureHandler.Strategy strategy, String str) {
        switch (AnonymousClass1.$SwitchMap$io$smallrye$reactive$messaging$mqtt$MqttFailureHandler$Strategy[strategy.ordinal()]) {
            case 1:
                return new MqttIgnoreFailure(str);
            case 2:
                return new MqttFailStop(str);
            default:
                throw MqttExceptions.ex.illegalArgumentUnknownStrategy(strategy.toString());
        }
    }

    public Flow.Publisher<HiveMQReceivingMqttMessage> getSource() {
        return this.source;
    }

    public boolean isSubscribed() {
        return this.subscribed.get();
    }

    private Multi<HiveMQReceivingMqttMessage> createMqttSource(HiveMQClients.ClientHolder clientHolder, String str, int i, boolean z, MqttFailureHandler mqttFailureHandler) {
        return clientHolder.connect().onItem().transformToMulti(mqtt3RxClient -> {
            MultiOnFailure onFailure = ((Multi) Multi.createFrom().publisher(AdaptersToFlow.publisher(((FlowableWithSingle) mqtt3RxClient.subscribePublishesWith().topicFilter(str).qos(MqttQos.fromCode(i)).applySubscribe()).doOnSingle(mqtt3SubAck -> {
                this.subscribed.set(true);
            }))).filter(mqtt3Publish -> {
                return matches(str, mqtt3Publish);
            }).onItem().transform(mqtt3Publish2 -> {
                return new HiveMQReceivingMqttMessage(mqtt3Publish2, mqttFailureHandler);
            }).stage(multi -> {
                return z ? multi.broadcast().toAllSubscribers() : multi;
            })).onCancellation().invoke(() -> {
                this.subscribed.set(false);
            }).onFailure();
            MqttLogging mqttLogging = MqttLogging.log;
            Objects.requireNonNull(mqttLogging);
            return onFailure.invoke(mqttLogging::unableToConnectToBroker);
        });
    }
}
