package org.apache.pulsar.client.cli;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
import java.io.IOException;
import java.net.URI;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.cli.AbstractCmdConsume;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import picocli.CommandLine;

@CommandLine.Command(description = {"Read messages from a specified topic"})
/* loaded from: input_file:org/apache/pulsar/client/cli/CmdRead.class */
public class CmdRead extends AbstractCmdConsume {
    private static final Pattern MSG_ID_PATTERN = Pattern.compile("^(-?[1-9][0-9]*|0):(-?[1-9][0-9]*|0)$");

    @CommandLine.Parameters(description = {"TopicName"}, arity = "1")
    private String topic;

    @CommandLine.Option(names = {"-ekv", "--encryption-key-value"}, description = {"The URI of private key to decrypt payload, for example file:///path/to/private.key or data:application/x-pem-file;base64,*****"})
    private String encKeyValue;

    @CommandLine.Option(names = {"-m", "--start-message-id"}, description = {"Initial reader position, it can be 'latest', 'earliest' or '<ledgerId>:<entryId>'"})
    private String startMessageId = "latest";

    @CommandLine.Option(names = {"-i", "--start-message-id-inclusive"}, description = {"Whether to include the position specified by -m option."})
    private boolean startMessageIdInclusive = false;

    @CommandLine.Option(names = {"-n", "--num-messages"}, description = {"Number of messages to read, 0 means to read forever."})
    private int numMessagesToRead = 1;

    @CommandLine.Option(names = {"--hex"}, description = {"Display binary messages in hex."})
    private boolean displayHex = false;

    @CommandLine.Option(names = {"--hide-content"}, description = {"Do not write the message to console."})
    private boolean hideContent = false;

    @CommandLine.Option(names = {"-r", "--rate"}, description = {"Rate (in msg/sec) at which to read, value 0 means to read messages as fast as possible."})
    private double readRate = 0.0d;

    @CommandLine.Option(names = {"-q", "--queue-size"}, description = {"Reader receiver queue size."})
    private int receiverQueueSize = 0;

    @CommandLine.Option(names = {"-mc", "--max_chunked_msg"}, description = {"Max pending chunk messages"})
    private int maxPendingChunkedMessage = 0;

    @CommandLine.Option(names = {"-ac", "--auto_ack_chunk_q_full"}, description = {"Auto ack for oldest message on queue is full"})
    private boolean autoAckOldestChunkedMessageOnQueueFull = false;

    @CommandLine.Option(names = {"-st", "--schema-type"}, description = {"Set a schema type on the reader, it can be 'bytes' or 'auto_consume'"})
    private String schemaType = "bytes";

    @CommandLine.Option(names = {"-pm", "--pool-messages"}, description = {"Use the pooled message"}, arity = "1")
    private boolean poolMessages = true;

    @CommandLine.Option(names = {"-ca", "--crypto-failure-action"}, description = {"Crypto Failure Action"})
    private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;

    @CommandLine.Option(names = {"-mp", "--print-metadata"}, description = {"Message metadata"})
    private boolean printMetadata = false;

    @Override // org.apache.pulsar.client.cli.AbstractCmd
    public int run() throws PulsarClientException, IOException {
        if (this.numMessagesToRead < 0) {
            throw new IllegalArgumentException("Number of messages should be zero or positive.");
        }
        return this.serviceURL.startsWith("ws") ? readFromWebSocket(this.topic) : read(this.topic);
    }

    private int read(String str) {
        PulsarClient build;
        int i = 0;
        int i2 = 0;
        try {
            try {
                build = this.clientBuilder.build();
            } catch (Exception e) {
                LOG.error("Error while reading messages");
                LOG.error(e.getMessage(), e);
                i2 = -1;
                LOG.info("{} messages successfully read", 0);
            }
            try {
                Schema schema = this.poolMessages ? Schema.BYTEBUFFER : Schema.BYTES;
                if ("auto_consume".equals(this.schemaType)) {
                    schema = Schema.AUTO_CONSUME();
                } else if (!"bytes".equals(this.schemaType)) {
                    throw new IllegalArgumentException("schema type must be 'bytes' or 'auto_consume'");
                }
                ReaderBuilder poolMessages = build.newReader(schema).topic(str).startMessageId(parseMessageId(this.startMessageId)).poolMessages(this.poolMessages);
                if (this.startMessageIdInclusive) {
                    poolMessages.startMessageIdInclusive();
                }
                if (this.maxPendingChunkedMessage > 0) {
                    poolMessages.maxPendingChunkedMessage(this.maxPendingChunkedMessage);
                }
                if (this.receiverQueueSize > 0) {
                    poolMessages.receiverQueueSize(this.receiverQueueSize);
                }
                poolMessages.autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull);
                poolMessages.cryptoFailureAction(this.cryptoFailureAction);
                if (StringUtils.isNotBlank(this.encKeyValue)) {
                    poolMessages.defaultCryptoKeyReader(this.encKeyValue);
                }
                Reader create = poolMessages.create();
                try {
                    RateLimiter create2 = this.readRate > 0.0d ? RateLimiter.create(this.readRate) : null;
                    while (true) {
                        if (this.numMessagesToRead != 0 && i >= this.numMessagesToRead) {
                            break;
                        }
                        if (create2 != null) {
                            create2.acquire();
                        }
                        Message<?> readNext = create.readNext(5, TimeUnit.SECONDS);
                        if (readNext == null) {
                            LOG.debug("No message to read after waiting for 5 seconds.");
                        } else {
                            try {
                                i++;
                                if (!this.hideContent) {
                                    System.out.println("----- got message -----");
                                    System.out.println(interpretMessage(readNext, this.displayHex, this.printMetadata));
                                } else if (i % 1000 == 0) {
                                    System.out.println("Received " + i + " messages");
                                }
                                readNext.release();
                            } finally {
                            }
                        }
                    }
                    if (create != null) {
                        create.close();
                    }
                    if (build != null) {
                        build.close();
                    }
                    LOG.info("{} messages successfully read", Integer.valueOf(i));
                    return i2;
                } catch (Throwable th) {
                    if (create != null) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (build != null) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        } catch (Throwable th5) {
            LOG.info("{} messages successfully read", 0);
            throw th5;
        }
    }

    @VisibleForTesting
    public String getWebSocketReadUri(String str) {
        String substring = this.serviceURL.substring(0, this.serviceURL.endsWith("/") ? this.serviceURL.length() - 1 : this.serviceURL.length());
        TopicName topicName = TopicName.get(str);
        return String.format("%s/ws" + (topicName.isV2() ? "/v2/" : "/") + "reader/%s?messageId=%s", substring, topicName.isV2() ? String.format("%s/%s/%s/%s", topicName.getDomain(), topicName.getTenant(), topicName.getNamespacePortion(), topicName.getLocalName()) : String.format("%s/%s/%s/%s/%s", topicName.getDomain(), topicName.getTenant(), topicName.getCluster(), topicName.getNamespacePortion(), topicName.getLocalName()), ("latest".equals(this.startMessageId) || "earliest".equals(this.startMessageId)) ? this.startMessageId : Base64.getEncoder().encodeToString(parseMessageId(this.startMessageId).toByteArray()));
    }

    private int readFromWebSocket(String str) {
        int i = 0;
        int i2 = 0;
        URI create = URI.create(getWebSocketReadUri(str));
        WebSocketClient webSocketClient = new WebSocketClient(new SslContextFactory(true));
        ClientUpgradeRequest clientUpgradeRequest = new ClientUpgradeRequest();
        try {
            if (this.authentication != null) {
                this.authentication.start();
                AuthenticationDataProvider authData = this.authentication.getAuthData();
                if (authData.hasDataForHttp()) {
                    for (Map.Entry entry : authData.getHttpHeaders()) {
                        clientUpgradeRequest.setHeader((String) entry.getKey(), (String) entry.getValue());
                    }
                }
            }
            CompletableFuture completableFuture = new CompletableFuture();
            AbstractCmdConsume.ConsumerSocket consumerSocket = new AbstractCmdConsume.ConsumerSocket(completableFuture);
            try {
                webSocketClient.start();
                try {
                    LOG.info("Trying to create websocket session..{}", create);
                    webSocketClient.connect(consumerSocket, create, clientUpgradeRequest);
                    completableFuture.get();
                    try {
                        try {
                            RateLimiter create2 = this.readRate > 0.0d ? RateLimiter.create(this.readRate) : null;
                            while (true) {
                                if (this.numMessagesToRead != 0 && i >= this.numMessagesToRead) {
                                    break;
                                }
                                if (create2 != null) {
                                    create2.acquire();
                                }
                                String receive = consumerSocket.receive(5L, TimeUnit.SECONDS);
                                if (receive == null) {
                                    LOG.debug("No message to read after waiting for 5 seconds.");
                                } else {
                                    try {
                                        System.out.println(interpretByteArray(this.displayHex, Base64.getDecoder().decode(receive)));
                                    } catch (Exception e) {
                                        System.out.println(receive);
                                    }
                                    i++;
                                }
                            }
                            consumerSocket.awaitClose(2, TimeUnit.SECONDS);
                            LOG.info("{} messages successfully read", Integer.valueOf(i));
                        } catch (Exception e2) {
                            LOG.error("Error while reading messages");
                            LOG.error(e2.getMessage(), e2);
                            i2 = -1;
                            LOG.info("{} messages successfully read", Integer.valueOf(i));
                        }
                        return i2;
                    } catch (Throwable th) {
                        LOG.info("{} messages successfully read", Integer.valueOf(i));
                        throw th;
                    }
                } catch (Exception e3) {
                    LOG.error("Failed to create web-socket session", e3);
                    return -1;
                }
            } catch (Exception e4) {
                LOG.error("Failed to start websocket-client", e4);
                return -1;
            }
        } catch (Exception e5) {
            LOG.error("Authentication plugin error: " + e5.getMessage());
            return -1;
        }
    }

    @VisibleForTesting
    static MessageId parseMessageId(String str) {
        MessageId messageIdImpl;
        if ("latest".equals(str)) {
            messageIdImpl = MessageId.latest;
        } else if ("earliest".equals(str)) {
            messageIdImpl = MessageId.earliest;
        } else {
            Matcher matcher = MSG_ID_PATTERN.matcher(str);
            if (!matcher.find()) {
                throw new IllegalArgumentException("Message ID must be 'latest', 'earliest' or '<ledgerId>:<entryId>'");
            }
            messageIdImpl = new MessageIdImpl(Long.parseLong(matcher.group(1)), Long.parseLong(matcher.group(2)), -1);
        }
        return messageIdImpl;
    }
}
