package org.apache.pulsar.client.cli;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
import java.io.IOException;
import java.net.URI;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.cli.AbstractCmdConsume;
import org.apache.pulsar.common.naming.TopicName;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import picocli.CommandLine;

@CommandLine.Command(description = {"Consume messages from a specified topic"})
/* loaded from: input_file:org/apache/pulsar/client/cli/CmdConsume.class */
public class CmdConsume extends AbstractCmdConsume {

    @CommandLine.Parameters(description = {"TopicName"}, arity = "1")
    private String topic;

    @CommandLine.Option(names = {"-s", "--subscription-name"}, required = true, description = {"Subscription name."})
    private String subscriptionName;

    @CommandLine.Option(names = {"-ekv", "--encryption-key-value"}, description = {"The URI of private key to decrypt payload, for example file:///path/to/private.key or data:application/x-pem-file;base64,*****"})
    private String encKeyValue;

    @CommandLine.Spec
    private CommandLine.Model.CommandSpec commandSpec;

    @CommandLine.Option(names = {"-t", "--subscription-type"}, description = {"Subscription type."})
    private SubscriptionType subscriptionType = SubscriptionType.Exclusive;

    @CommandLine.Option(names = {"-m", "--subscription-mode"}, description = {"Subscription mode."})
    private SubscriptionMode subscriptionMode = SubscriptionMode.Durable;

    @CommandLine.Option(names = {"-p", "--subscription-position"}, description = {"Subscription position."})
    private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;

    @CommandLine.Option(names = {"-n", "--num-messages"}, description = {"Number of messages to consume, 0 means to consume forever."})
    private int numMessagesToConsume = 1;

    @CommandLine.Option(names = {"--hex"}, description = {"Display binary messages in hex."})
    private boolean displayHex = false;

    @CommandLine.Option(names = {"--hide-content"}, description = {"Do not write the message to console."})
    private boolean hideContent = false;

    @CommandLine.Option(names = {"-r", "--rate"}, description = {"Rate (in msg/sec) at which to consume, value 0 means to consume messages as fast as possible."})
    private double consumeRate = 0.0d;

    @CommandLine.Option(names = {"--regex"}, description = {"Indicate the topic name is a regex pattern"})
    private boolean isRegex = false;

    @CommandLine.Option(names = {"-q", "--queue-size"}, description = {"Consumer receiver queue size."})
    private int receiverQueueSize = 0;

    @CommandLine.Option(names = {"-mc", "--max_chunked_msg"}, description = {"Max pending chunk messages"})
    private int maxPendingChunkedMessage = 0;

    @CommandLine.Option(names = {"-ac", "--auto_ack_chunk_q_full"}, description = {"Auto ack for oldest message on queue is full"})
    private boolean autoAckOldestChunkedMessageOnQueueFull = false;

    @CommandLine.Option(names = {"-st", "--schema-type"}, description = {"Set a schema type on the consumer, it can be 'bytes' or 'auto_consume'"})
    private String schemaType = "bytes";

    @CommandLine.Option(names = {"-pm", "--pool-messages"}, description = {"Use the pooled message"}, arity = "1")
    private boolean poolMessages = true;

    @CommandLine.Option(names = {"-rs", "--replicated"}, description = {"Whether the subscription status should be replicated"})
    private boolean replicateSubscriptionState = false;

    @CommandLine.Option(names = {"-ca", "--crypto-failure-action"}, description = {"Crypto Failure Action"})
    private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;

    @CommandLine.Option(names = {"-mp", "--print-metadata"}, description = {"Message metadata"})
    private boolean printMetadata = false;

    @Override // org.apache.pulsar.client.cli.AbstractCmd
    public int run() throws IOException {
        if (this.subscriptionName == null || this.subscriptionName.isEmpty()) {
            throw new CommandLine.ParameterException(this.commandSpec.commandLine(), "Subscription name is not provided.");
        }
        if (this.numMessagesToConsume < 0) {
            throw new CommandLine.ParameterException(this.commandSpec.commandLine(), "Number of messages should be zero or positive.");
        }
        return this.serviceURL.startsWith("ws") ? consumeFromWebSocket(this.topic) : consume(this.topic);
    }

    private int consume(String str) {
        PulsarClient build;
        int i = 0;
        int i2 = 0;
        try {
            try {
                build = this.clientBuilder.build();
            } catch (Exception e) {
                LOG.error("Error while consuming messages");
                LOG.error(e.getMessage(), e);
                i2 = -1;
                LOG.info("{} messages successfully consumed", 0);
            }
            try {
                Schema schema = this.poolMessages ? Schema.BYTEBUFFER : Schema.BYTES;
                if ("auto_consume".equals(this.schemaType)) {
                    schema = Schema.AUTO_CONSUME();
                } else if (!"bytes".equals(this.schemaType)) {
                    throw new IllegalArgumentException("schema type must be 'bytes' or 'auto_consume'");
                }
                ConsumerBuilder replicateSubscriptionState = build.newConsumer(schema).subscriptionName(this.subscriptionName).subscriptionType(this.subscriptionType).subscriptionMode(this.subscriptionMode).subscriptionInitialPosition(this.subscriptionInitialPosition).poolMessages(this.poolMessages).replicateSubscriptionState(this.replicateSubscriptionState);
                if (this.isRegex) {
                    replicateSubscriptionState.topicsPattern(Pattern.compile(str));
                } else {
                    replicateSubscriptionState.topic(new String[]{str});
                }
                if (this.maxPendingChunkedMessage > 0) {
                    replicateSubscriptionState.maxPendingChunkedMessage(this.maxPendingChunkedMessage);
                }
                if (this.receiverQueueSize > 0) {
                    replicateSubscriptionState.receiverQueueSize(this.receiverQueueSize);
                }
                replicateSubscriptionState.autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull);
                replicateSubscriptionState.cryptoFailureAction(this.cryptoFailureAction);
                if (StringUtils.isNotBlank(this.encKeyValue)) {
                    replicateSubscriptionState.defaultCryptoKeyReader(this.encKeyValue);
                }
                Consumer subscribe = replicateSubscriptionState.subscribe();
                try {
                    RateLimiter create = this.consumeRate > 0.0d ? RateLimiter.create(this.consumeRate) : null;
                    while (true) {
                        if (this.numMessagesToConsume != 0 && i >= this.numMessagesToConsume) {
                            break;
                        }
                        if (create != null) {
                            create.acquire();
                        }
                        Message<?> receive = subscribe.receive(5, TimeUnit.SECONDS);
                        if (receive == null) {
                            LOG.debug("No message to consume after waiting for 5 seconds.");
                        } else {
                            try {
                                i++;
                                if (!this.hideContent) {
                                    System.out.println("----- got message -----");
                                    System.out.println(interpretMessage(receive, this.displayHex, this.printMetadata));
                                } else if (i % 1000 == 0) {
                                    System.out.println("Received " + i + " messages");
                                }
                                subscribe.acknowledge(receive);
                                receive.release();
                            } finally {
                            }
                        }
                    }
                    if (subscribe != null) {
                        subscribe.close();
                    }
                    if (build != null) {
                        build.close();
                    }
                    LOG.info("{} messages successfully consumed", Integer.valueOf(i));
                    return i2;
                } catch (Throwable th) {
                    if (subscribe != null) {
                        try {
                            subscribe.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (build != null) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        } catch (Throwable th5) {
            LOG.info("{} messages successfully consumed", 0);
            throw th5;
        }
    }

    @VisibleForTesting
    public String getWebSocketConsumeUri(String str) {
        String substring = this.serviceURL.substring(0, this.serviceURL.endsWith("/") ? this.serviceURL.length() - 1 : this.serviceURL.length());
        TopicName topicName = TopicName.get(str);
        return String.format("%s/ws" + (topicName.isV2() ? "/v2/" : "/") + "consumer/%s/%s?subscriptionType=%s&subscriptionMode=%s", substring, topicName.isV2() ? String.format("%s/%s/%s/%s", topicName.getDomain(), topicName.getTenant(), topicName.getNamespacePortion(), topicName.getLocalName()) : String.format("%s/%s/%s/%s/%s", topicName.getDomain(), topicName.getTenant(), topicName.getCluster(), topicName.getNamespacePortion(), topicName.getLocalName()), this.subscriptionName, this.subscriptionType.toString(), this.subscriptionMode.toString());
    }

    private int consumeFromWebSocket(String str) {
        int i = 0;
        int i2 = 0;
        URI create = URI.create(getWebSocketConsumeUri(str));
        WebSocketClient webSocketClient = new WebSocketClient(new SslContextFactory(true));
        ClientUpgradeRequest clientUpgradeRequest = new ClientUpgradeRequest();
        try {
            if (this.authentication != null) {
                this.authentication.start();
                AuthenticationDataProvider authData = this.authentication.getAuthData(create.getHost());
                if (authData.hasDataForHttp()) {
                    for (Map.Entry entry : authData.getHttpHeaders()) {
                        clientUpgradeRequest.setHeader((String) entry.getKey(), (String) entry.getValue());
                    }
                }
            }
            CompletableFuture completableFuture = new CompletableFuture();
            AbstractCmdConsume.ConsumerSocket consumerSocket = new AbstractCmdConsume.ConsumerSocket(completableFuture);
            try {
                webSocketClient.start();
                try {
                    LOG.info("Trying to create websocket session..{}", create);
                    webSocketClient.connect(consumerSocket, create, clientUpgradeRequest);
                    completableFuture.get();
                    try {
                        try {
                            RateLimiter create2 = this.consumeRate > 0.0d ? RateLimiter.create(this.consumeRate) : null;
                            while (true) {
                                if (this.numMessagesToConsume != 0 && i >= this.numMessagesToConsume) {
                                    break;
                                }
                                if (create2 != null) {
                                    create2.acquire();
                                }
                                String receive = consumerSocket.receive(5L, TimeUnit.SECONDS);
                                if (receive == null) {
                                    LOG.debug("No message to consume after waiting for 5 seconds.");
                                } else {
                                    try {
                                        System.out.println(interpretByteArray(this.displayHex, Base64.getDecoder().decode(receive)));
                                    } catch (Exception e) {
                                        System.out.println(receive);
                                    }
                                    i++;
                                }
                            }
                            consumerSocket.awaitClose(2, TimeUnit.SECONDS);
                            LOG.info("{} messages successfully consumed", Integer.valueOf(i));
                        } catch (Throwable th) {
                            LOG.info("{} messages successfully consumed", Integer.valueOf(i));
                            throw th;
                        }
                    } catch (Exception e2) {
                        LOG.error("Error while consuming messages");
                        LOG.error(e2.getMessage(), e2);
                        i2 = -1;
                        LOG.info("{} messages successfully consumed", Integer.valueOf(i));
                    }
                    return i2;
                } catch (Exception e3) {
                    LOG.error("Failed to create web-socket session", e3);
                    return -1;
                }
            } catch (Exception e4) {
                LOG.error("Failed to start websocket-client", e4);
                return -1;
            }
        } catch (Exception e5) {
            LOG.error("Authentication plugin error: " + e5.getMessage());
            return -1;
        }
    }
}
