package io.quarkiverse.hivemqclient.smallrye.reactive;

import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt3.Mqtt3Client;
import com.hivemq.client.mqtt.mqtt3.Mqtt3RxClient;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import io.reactivex.Flowable;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.converters.uni.UniRxConverters;
import io.smallrye.reactive.messaging.mqtt.SendingMqttMessage;
import io.smallrye.reactive.messaging.mqtt.i18n.MqttLogging;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.buffer.Buffer;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;

/* loaded from: input_file:io/quarkiverse/hivemqclient/smallrye/reactive/HiveMQMqttSink.class */
public class HiveMQMqttSink {
    private final String topic;
    private final int qos;
    private final SubscriberBuilder<? extends Message<?>, Void> sink;
    private final AtomicBoolean connected = new AtomicBoolean();

    public HiveMQMqttSink(Vertx vertx, HiveMQMqttConnectorOutgoingConfiguration hiveMQMqttConnectorOutgoingConfiguration) {
        Optional<String> topic = hiveMQMqttConnectorOutgoingConfiguration.getTopic();
        Objects.requireNonNull(hiveMQMqttConnectorOutgoingConfiguration);
        this.topic = topic.orElseGet(hiveMQMqttConnectorOutgoingConfiguration::getChannel);
        this.qos = hiveMQMqttConnectorOutgoingConfiguration.getQos().intValue();
        AtomicReference atomicReference = new AtomicReference();
        ProcessorBuilder onComplete = ReactiveStreams.builder().flatMapCompletionStage(message -> {
            Mqtt3RxClient mqtt3RxClient = (Mqtt3RxClient) atomicReference.get();
            if (mqtt3RxClient == null) {
                return HiveMQClients.getConnectedClient(hiveMQMqttConnectorOutgoingConfiguration).map(mqtt3RxClient2 -> {
                    atomicReference.set(mqtt3RxClient2);
                    this.connected.set(true);
                    return message;
                }).subscribeAsCompletionStage();
            }
            if (mqtt3RxClient.getState().isConnected()) {
                this.connected.set(true);
                return CompletableFuture.completedFuture(message);
            }
            CompletableFuture completableFuture = new CompletableFuture();
            vertx.setPeriodic(100L, l -> {
                if (mqtt3RxClient.getState().isConnected()) {
                    vertx.cancelTimer(l.longValue());
                    this.connected.set(true);
                    completableFuture.complete(message);
                }
            });
            return completableFuture;
        }).flatMapCompletionStage(message2 -> {
            return send(atomicReference, message2);
        }).onComplete(() -> {
            Mqtt3Client mqtt3Client = (Mqtt3Client) atomicReference.getAndSet(null);
            if (mqtt3Client != null) {
                this.connected.set(false);
                mqtt3Client.toBlocking().disconnect();
            }
        });
        MqttLogging mqttLogging = MqttLogging.log;
        Objects.requireNonNull(mqttLogging);
        this.sink = onComplete.onError(mqttLogging::errorWhileSendingMessageToBroker).ignore();
    }

    private CompletionStage<?> send(AtomicReference<Mqtt3RxClient> atomicReference, Message<?> message) {
        Mqtt3RxClient mqtt3RxClient = atomicReference.get();
        String str = this.topic;
        MqttQos fromCode = MqttQos.fromCode(this.qos);
        boolean z = false;
        if (message instanceof SendingMqttMessage) {
            SendingMqttMessage sendingMqttMessage = (SendingMqttMessage) message;
            str = sendingMqttMessage.getTopic() == null ? this.topic : sendingMqttMessage.getTopic();
            fromCode = MqttQos.fromCode(sendingMqttMessage.getQosLevel() == null ? fromCode.getCode() : sendingMqttMessage.getQosLevel().value());
            z = sendingMqttMessage.isRetain();
        }
        if (str != null) {
            return Uni.createFrom().converter(UniRxConverters.fromFlowable(), mqtt3RxClient.publish(Flowable.just(Mqtt3Publish.builder().topic(str).qos(fromCode).payload(convert(message.getPayload())).retain(z).build()))).onItemOrFailure().transformToUni((mqtt3PublishResult, th) -> {
                return th != null ? Uni.createFrom().completionStage(message.nack(th).thenApply(r3 -> {
                    return message;
                })) : Uni.createFrom().completionStage(message.ack().thenApply(r32 -> {
                    return message;
                }));
            }).subscribeAsCompletionStage();
        }
        MqttLogging.log.ignoringNoTopicSet();
        return CompletableFuture.completedFuture(message);
    }

    private ByteBuffer convert(Object obj) {
        return ByteBuffer.wrap(toBuffer(obj).getBytes());
    }

    private Buffer toBuffer(Object obj) {
        return obj instanceof JsonObject ? new Buffer(((JsonObject) obj).toBuffer()) : obj instanceof JsonArray ? new Buffer(((JsonArray) obj).toBuffer()) : ((obj instanceof String) || obj.getClass().isPrimitive()) ? new Buffer(io.vertx.core.buffer.Buffer.buffer(obj.toString())) : obj instanceof byte[] ? new Buffer(io.vertx.core.buffer.Buffer.buffer((byte[]) obj)) : obj instanceof Buffer ? (Buffer) obj : obj instanceof io.vertx.core.buffer.Buffer ? new Buffer((io.vertx.core.buffer.Buffer) obj) : new Buffer(Json.encodeToBuffer(obj));
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSink() {
        return this.sink;
    }

    public boolean isReady() {
        return this.connected.get();
    }
}
