package io.quarkuscoffeeshop.kitchen.infrastructure;

import io.quarkuscoffeeshop.domain.EventType;
import io.quarkuscoffeeshop.domain.OrderInEvent;
import io.quarkuscoffeeshop.kitchen.domain.Kitchen;
import java.io.StringReader;
import java.util.concurrent.CompletionStage;
import javax.inject.Inject;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.bind.Jsonb;
import javax.json.bind.JsonbBuilder;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/quarkuscoffeeshop/kitchen/infrastructure/KafkaResource.class */
public class KafkaResource {
    private static final Logger logger = LoggerFactory.getLogger(KafkaResource.class);

    @Inject
    Kitchen kitchen;

    @Inject
    @Channel("orders-out")
    Emitter<String> orderUpEmitter;
    final Jsonb jsonb = JsonbBuilder.create();

    @Incoming("orders-in")
    public CompletionStage<Void> handleOrderIn(Message message) {
        logger.debug("message received: {}", message.getPayload());
        JsonObject readObject = Json.createReader(new StringReader((String) message.getPayload())).readObject();
        logger.debug("unmarshalled {}", readObject);
        if (!readObject.containsKey("eventType")) {
            return message.ack();
        }
        OrderInEvent orderInEvent = (OrderInEvent) this.jsonb.fromJson((String) message.getPayload(), OrderInEvent.class);
        return orderInEvent.eventType.equals(EventType.KITCHEN_ORDER_IN) ? this.kitchen.make(orderInEvent).thenApply(event -> {
            logger.debug("sending: {}", event.toString());
            return this.orderUpEmitter.send(this.jsonb.toJson(event));
        }).thenRun(() -> {
            message.ack();
        }) : message.ack();
    }
}
