package org.apache.pulsar.client.cli;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.commons.io.HexDump;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Parameters(commandDescription = "Consume messages from a specified topic")
/* loaded from: input_file:org/apache/pulsar/client/cli/CmdConsume.class */
public class CmdConsume {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarClientTool.class);
    private static final String MESSAGE_BOUNDARY = "----- got message -----";

    @Parameter(names = {"-s", "--subscription-name"}, required = true, description = "Subscription name.")
    private String subscriptionName;

    @Parameter(names = {"-ekv", "--encryption-key-value"}, description = "The URI of private key to decrypt payload, for example file:///path/to/private.key or data:application/x-pem-file;base64,*****")
    private String encKeyValue;
    private ClientBuilder clientBuilder;
    private Authentication authentication;
    private String serviceURL;

    @Parameter(description = "TopicName", required = true)
    private List<String> mainOptions = new ArrayList();

    @Parameter(names = {"-t", "--subscription-type"}, description = "Subscription type.")
    private SubscriptionType subscriptionType = SubscriptionType.Exclusive;

    @Parameter(names = {"-m", "--subscription-mode"}, description = "Subscription mode.")
    private SubscriptionMode subscriptionMode = SubscriptionMode.Durable;

    @Parameter(names = {"-p", "--subscription-position"}, description = "Subscription position.")
    private SubscriptionInitialPosition subscriptionInitialPosition = SubscriptionInitialPosition.Latest;

    @Parameter(names = {"-n", "--num-messages"}, description = "Number of messages to consume, 0 means to consume forever.")
    private int numMessagesToConsume = 1;

    @Parameter(names = {"--hex"}, description = "Display binary messages in hex.")
    private boolean displayHex = false;

    @Parameter(names = {"--hide-content"}, description = "Do not write the message to console.")
    private boolean hideContent = false;

    @Parameter(names = {"-r", "--rate"}, description = "Rate (in msg/sec) at which to consume, value 0 means to consume messages as fast as possible.")
    private double consumeRate = 0.0d;

    @Parameter(names = {"--regex"}, description = "Indicate the topic name is a regex pattern")
    private boolean isRegex = false;

    @Parameter(names = {"-q", "--queue-size"}, description = "Consumer receiver queue size.")
    private int receiverQueueSize = 0;

    @Parameter(names = {"-mc", "--max_chunked_msg"}, description = "Max pending chunk messages")
    private int maxPendingChunkedMessage = 0;

    @Parameter(names = {"-ac", "--auto_ack_chunk_q_full"}, description = "Auto ack for oldest message on queue is full")
    private boolean autoAckOldestChunkedMessageOnQueueFull = false;

    @Parameter(names = {"-st", "--schema-type"}, description = "Set a schema type on the consumer, it can be 'bytes' or 'auto_consume'")
    private String schematype = "bytes";

    @Parameter(names = {"-pm", "--pool-messages"}, description = "Use the pooled message")
    private boolean poolMessages = true;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.pulsar.client.cli.CmdConsume$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/pulsar/client/cli/CmdConsume$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$pulsar$common$schema$SchemaType = new int[SchemaType.values().length];

        static {
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.AVRO.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.JSON.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.PROTOBUF_NATIVE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$schema$SchemaType[SchemaType.KEY_VALUE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    @WebSocket(maxTextMessageSize = 65536)
    /* loaded from: input_file:org/apache/pulsar/client/cli/CmdConsume$ConsumerSocket.class */
    public static class ConsumerSocket {
        private static final String X_PULSAR_MESSAGE_ID = "messageId";
        private Session session;
        private CompletableFuture<Void> connected;
        private static final Logger log = LoggerFactory.getLogger(ConsumerSocket.class);
        private final CountDownLatch closeLatch = new CountDownLatch(1);
        final BlockingQueue<String> incomingMessages = new GrowableArrayBlockingQueue();

        public ConsumerSocket(CompletableFuture<Void> completableFuture) {
            this.connected = completableFuture;
        }

        public boolean awaitClose(int i, TimeUnit timeUnit) throws InterruptedException {
            return this.closeLatch.await(i, timeUnit);
        }

        @OnWebSocketClose
        public void onClose(int i, String str) {
            log.info("Connection closed: {} - {}", Integer.valueOf(i), str);
            this.session = null;
            this.closeLatch.countDown();
        }

        @OnWebSocketConnect
        public void onConnect(Session session) throws InterruptedException {
            log.info("Got connect: {}", session);
            this.session = session;
            this.connected.complete(null);
        }

        @OnWebSocketMessage
        public synchronized void onMessage(String str) throws Exception {
            JsonObject jsonObject = (JsonObject) new Gson().fromJson(str, JsonObject.class);
            JsonObject jsonObject2 = new JsonObject();
            jsonObject2.add(X_PULSAR_MESSAGE_ID, new JsonPrimitive(jsonObject.get(X_PULSAR_MESSAGE_ID).getAsString()));
            getRemote().sendString(jsonObject2.toString());
            this.incomingMessages.put(str);
        }

        public String receive(long j, TimeUnit timeUnit) throws Exception {
            return this.incomingMessages.poll(j, timeUnit);
        }

        public RemoteEndpoint getRemote() {
            return this.session.getRemote();
        }

        public Session getSession() {
            return this.session;
        }

        public void close() {
            this.session.close();
        }
    }

    public void updateConfig(ClientBuilder clientBuilder, Authentication authentication, String str) {
        this.clientBuilder = clientBuilder;
        this.authentication = authentication;
        this.serviceURL = str;
    }

    private String interpretMessage(Message<?> message, boolean z) throws IOException {
        StringBuilder sb = new StringBuilder();
        String arrays = Arrays.toString(message.getProperties().entrySet().toArray());
        Object value = message.getValue();
        String interpretByteArray = value == null ? "null" : value instanceof byte[] ? interpretByteArray(z, (byte[]) value) : value instanceof GenericObject ? genericObjectToMap((GenericObject) value, z).toString() : value instanceof ByteBuffer ? new String(DefaultImplementation.getBytes((ByteBuffer) value)) : value.toString();
        String str = null;
        if (message.hasKey()) {
            str = message.getKey();
        }
        sb.append("key:[").append(str).append("], ");
        if (!arrays.isEmpty()) {
            sb.append("properties:").append(arrays).append(", ");
        }
        sb.append("content:").append(interpretByteArray);
        return sb.toString();
    }

    private static String interpretByteArray(boolean z, byte[] bArr) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        if (!z) {
            return new String(bArr);
        }
        HexDump.dump(bArr, 0L, byteArrayOutputStream, 0);
        return byteArrayOutputStream.toString();
    }

    private static Map<String, Object> genericObjectToMap(GenericObject genericObject, boolean z) throws IOException {
        switch (AnonymousClass1.$SwitchMap$org$apache$pulsar$common$schema$SchemaType[genericObject.getSchemaType().ordinal()]) {
            case 1:
            case 2:
            case 3:
                return genericRecordToMap((GenericRecord) genericObject, z);
            case 4:
                return keyValueToMap((KeyValue) genericObject.getNativeObject(), z);
            default:
                return primitiveValueToMap(genericObject.getNativeObject(), z);
        }
    }

    private static Map<String, Object> keyValueToMap(KeyValue keyValue, boolean z) throws IOException {
        return keyValue == null ? ImmutableMap.of("value", "NULL") : ImmutableMap.of("key", primitiveValueToMap(keyValue.getKey(), z), "value", primitiveValueToMap(keyValue.getValue(), z));
    }

    private static Map<String, Object> primitiveValueToMap(Object obj, boolean z) throws IOException {
        if (obj == null) {
            return ImmutableMap.of("value", "NULL");
        }
        if (obj instanceof GenericObject) {
            return genericObjectToMap((GenericObject) obj, z);
        }
        if (obj instanceof byte[]) {
            obj = interpretByteArray(z, (byte[]) obj);
        }
        return ImmutableMap.of("value", obj.toString(), "type", obj.getClass());
    }

    private static Map<String, Object> genericRecordToMap(GenericRecord genericRecord, boolean z) throws IOException {
        HashMap hashMap = new HashMap();
        for (Field field : genericRecord.getFields()) {
            Object field2 = genericRecord.getField(field);
            if (field2 instanceof GenericRecord) {
                field2 = genericRecordToMap((GenericRecord) field2, z);
            } else if (field2 == null) {
                field2 = "NULL";
            } else if (field2 instanceof byte[]) {
                field2 = interpretByteArray(z, (byte[]) field2);
            }
            hashMap.put(field.getName(), field2);
        }
        return hashMap;
    }

    public int run() throws PulsarClientException, IOException {
        if (this.mainOptions.size() != 1) {
            throw new ParameterException("Please provide one and only one topic name.");
        }
        if (this.subscriptionName == null || this.subscriptionName.isEmpty()) {
            throw new ParameterException("Subscription name is not provided.");
        }
        if (this.numMessagesToConsume < 0) {
            throw new ParameterException("Number of messages should be zero or positive.");
        }
        String str = this.mainOptions.get(0);
        return this.serviceURL.startsWith("ws") ? consumeFromWebSocket(str) : consume(str);
    }

    private int consume(String str) {
        int i = 0;
        int i2 = 0;
        try {
            try {
                PulsarClient build = this.clientBuilder.build();
                Schema schema = this.poolMessages ? Schema.BYTEBUFFER : Schema.BYTES;
                if ("auto_consume".equals(this.schematype)) {
                    schema = Schema.AUTO_CONSUME();
                } else if (!"bytes".equals(this.schematype)) {
                    throw new IllegalArgumentException("schema type must be 'bytes' or 'auto_consume");
                }
                ConsumerBuilder poolMessages = build.newConsumer(schema).subscriptionName(this.subscriptionName).subscriptionType(this.subscriptionType).subscriptionMode(this.subscriptionMode).subscriptionInitialPosition(this.subscriptionInitialPosition).poolMessages(this.poolMessages);
                if (this.isRegex) {
                    poolMessages.topicsPattern(Pattern.compile(str));
                } else {
                    poolMessages.topic(new String[]{str});
                }
                if (this.maxPendingChunkedMessage > 0) {
                    poolMessages.maxPendingChunkedMessage(this.maxPendingChunkedMessage);
                }
                if (this.receiverQueueSize > 0) {
                    poolMessages.receiverQueueSize(this.receiverQueueSize);
                }
                poolMessages.autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull);
                if (StringUtils.isNotBlank(this.encKeyValue)) {
                    poolMessages.defaultCryptoKeyReader(this.encKeyValue);
                }
                Consumer subscribe = poolMessages.subscribe();
                RateLimiter create = this.consumeRate > 0.0d ? RateLimiter.create(this.consumeRate) : null;
                while (true) {
                    if (this.numMessagesToConsume != 0 && i >= this.numMessagesToConsume) {
                        break;
                    }
                    if (create != null) {
                        create.acquire();
                    }
                    Message<?> receive = subscribe.receive(5, TimeUnit.SECONDS);
                    if (receive == null) {
                        LOG.debug("No message to consume after waiting for 5 seconds.");
                    } else {
                        try {
                            i++;
                            if (!this.hideContent) {
                                System.out.println(MESSAGE_BOUNDARY);
                                System.out.println(interpretMessage(receive, this.displayHex));
                            } else if (i % 1000 == 0) {
                                System.out.println("Received " + i + " messages");
                            }
                            subscribe.acknowledge(receive);
                            receive.release();
                        } catch (Throwable th) {
                            receive.release();
                            throw th;
                        }
                    }
                }
                build.close();
                LOG.info("{} messages successfully consumed", Integer.valueOf(i));
            } catch (Throwable th2) {
                LOG.info("{} messages successfully consumed", 0);
                throw th2;
            }
        } catch (Exception e) {
            LOG.error("Error while consuming messages");
            LOG.error(e.getMessage(), e);
            i2 = -1;
            LOG.info("{} messages successfully consumed", 0);
        }
        return i2;
    }

    private int consumeFromWebSocket(String str) {
        int i = 0;
        int i2 = 0;
        TopicName topicName = TopicName.get(str);
        URI create = URI.create(this.serviceURL + (this.serviceURL.endsWith("/") ? "" : "/") + "ws/consumer/" + String.format("%s/%s/" + (StringUtils.isEmpty(topicName.getCluster()) ? "" : topicName.getCluster() + "/") + "%s/%s/%s?subscriptionType=%s&subscriptionMode=%s", topicName.getDomain(), topicName.getTenant(), topicName.getNamespacePortion(), topicName.getLocalName(), this.subscriptionName, this.subscriptionType.toString(), this.subscriptionMode.toString()));
        WebSocketClient webSocketClient = new WebSocketClient(new SslContextFactory(true));
        ClientUpgradeRequest clientUpgradeRequest = new ClientUpgradeRequest();
        try {
            if (this.authentication != null) {
                this.authentication.start();
                AuthenticationDataProvider authData = this.authentication.getAuthData();
                if (authData.hasDataForHttp()) {
                    for (Map.Entry entry : authData.getHttpHeaders()) {
                        clientUpgradeRequest.setHeader((String) entry.getKey(), (String) entry.getValue());
                    }
                }
            }
            CompletableFuture completableFuture = new CompletableFuture();
            ConsumerSocket consumerSocket = new ConsumerSocket(completableFuture);
            try {
                webSocketClient.start();
                try {
                    LOG.info("Trying to create websocket session..{}", create);
                    webSocketClient.connect(consumerSocket, create, clientUpgradeRequest);
                    completableFuture.get();
                    try {
                        try {
                            RateLimiter create2 = this.consumeRate > 0.0d ? RateLimiter.create(this.consumeRate) : null;
                            while (true) {
                                if (this.numMessagesToConsume != 0 && i >= this.numMessagesToConsume) {
                                    break;
                                }
                                if (create2 != null) {
                                    create2.acquire();
                                }
                                String receive = consumerSocket.receive(5L, TimeUnit.SECONDS);
                                if (receive == null) {
                                    LOG.debug("No message to consume after waiting for 5 seconds.");
                                } else {
                                    try {
                                        System.out.println(interpretByteArray(this.displayHex, Base64.getDecoder().decode(receive)));
                                    } catch (Exception e) {
                                        System.out.println(receive);
                                    }
                                    i++;
                                }
                            }
                            consumerSocket.awaitClose(2, TimeUnit.SECONDS);
                            LOG.info("{} messages successfully consumed", Integer.valueOf(i));
                        } catch (Exception e2) {
                            LOG.error("Error while consuming messages");
                            LOG.error(e2.getMessage(), e2);
                            i2 = -1;
                            LOG.info("{} messages successfully consumed", Integer.valueOf(i));
                        }
                        return i2;
                    } catch (Throwable th) {
                        LOG.info("{} messages successfully consumed", Integer.valueOf(i));
                        throw th;
                    }
                } catch (Exception e3) {
                    LOG.error("Failed to create web-socket session", e3);
                    return -1;
                }
            } catch (Exception e4) {
                LOG.error("Failed to start websocket-client", e4);
                return -1;
            }
        } catch (Exception e5) {
            LOG.error("Authentication plugin error: " + e5.getMessage());
            return -1;
        }
    }
}
