package io.bootique.rabbitmq.client.pubsub;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import io.bootique.rabbitmq.client.channel.RmqChannelBuilder;
import io.bootique.rabbitmq.client.channel.RmqChannelFactory;
import io.bootique.rabbitmq.client.topology.RmqTopology;
import java.io.IOException;
import java.util.Map;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/bootique/rabbitmq/client/pubsub/RmqSubBuilder.class */
public class RmqSubBuilder {
    private static final Logger LOGGER = LoggerFactory.getLogger(RmqSubBuilder.class);
    private final RmqChannelFactory channelFactory;
    private final String connectionName;
    private final Map<String, Channel> consumerChannels;
    private String exchange;
    private String queue;
    private String routingKey;
    private boolean autoAck;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/bootique/rabbitmq/client/pubsub/RmqSubBuilder$DeliverOrCancelConsumer.class */
    public static class DeliverOrCancelConsumer implements Consumer {
        private final CancelCallback onCancel;
        private final DeliverCallback onDeliver;

        public DeliverOrCancelConsumer(DeliverCallback deliverCallback, CancelCallback cancelCallback) {
            this.onCancel = cancelCallback;
            this.onDeliver = deliverCallback;
        }

        public void handleConsumeOk(String str) {
        }

        public void handleCancelOk(String str) {
        }

        public void handleCancel(String str) throws IOException {
            this.onCancel.handle(str);
        }

        public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
        }

        public void handleRecoverOk(String str) {
        }

        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
            this.onDeliver.handle(str, new Delivery(envelope, basicProperties, bArr));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RmqSubBuilder(RmqChannelFactory rmqChannelFactory, String str, Map<String, Channel> map) {
        this.channelFactory = rmqChannelFactory;
        this.connectionName = str;
        this.consumerChannels = map;
    }

    public RmqSubBuilder exchange(String str) {
        this.exchange = RmqTopology.normalizeName(str);
        return this;
    }

    public RmqSubBuilder queue(String str) {
        this.queue = RmqTopology.normalizeName(str);
        return this;
    }

    public RmqSubBuilder routingKey(String str) {
        this.routingKey = RmqTopology.normalizeName(str);
        return this;
    }

    public RmqSubBuilder autoAck(boolean z) {
        this.autoAck = z;
        return this;
    }

    public String subscribe(DeliverCallback deliverCallback) {
        return subscribe(deliverCallback, str -> {
            LOGGER.debug("Subscription was canceled for '{}'", str);
        });
    }

    public String subscribe(DeliverCallback deliverCallback, CancelCallback cancelCallback) {
        return subscribe(new DeliverOrCancelConsumer(deliverCallback, cancelCallback));
    }

    public String subscribe(Consumer consumer) {
        return subscribe(channel -> {
            return consumer;
        });
    }

    public String subscribe(Function<Channel, Consumer> function) {
        Channel createChannelWithTopology = createChannelWithTopology();
        try {
            String basicConsume = createChannelWithTopology.basicConsume(this.queue, this.autoAck, function.apply(createChannelWithTopology));
            this.consumerChannels.put(basicConsume, createChannelWithTopology);
            return basicConsume;
        } catch (IOException e) {
            throw new RuntimeException("Error publishing RMQ message for connection: " + this.connectionName, e);
        }
    }

    protected Channel createChannelWithTopology() {
        RmqTopology.required(this.queue, "Consumer queue is not defined");
        RmqChannelBuilder newChannel = this.channelFactory.newChannel(this.connectionName);
        if (RmqTopology.isDefined(this.exchange)) {
            newChannel.ensureQueueBoundToExchange(this.queue, this.exchange, this.routingKey);
        } else {
            newChannel.ensureQueue(this.queue);
        }
        return newChannel.open();
    }
}
