package org.apache.pulsar.sql.presto;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.BoundType;
import io.airlift.log.Logger;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.StandardErrorCode;
import io.prestosql.spi.block.Block;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorSplitManager;
import io.prestosql.spi.connector.ConnectorSplitSource;
import io.prestosql.spi.connector.ConnectorTableLayoutHandle;
import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.connector.FixedSplitSource;
import io.prestosql.spi.predicate.Domain;
import io.prestosql.spi.predicate.Range;
import io.prestosql.spi.predicate.TupleDomain;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.inject.Inject;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.schema.SchemaInfo;

/* loaded from: input_file:org/apache/pulsar/sql/presto/PulsarSplitManager.class */
public class PulsarSplitManager implements ConnectorSplitManager {
    private final String connectorId;
    private final PulsarConnectorConfig pulsarConnectorConfig;
    private final PulsarAdmin pulsarAdmin;
    private static final Logger log = Logger.get(PulsarSplitManager.class);
    private ObjectMapper objectMapper = new ObjectMapper();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/sql/presto/PulsarSplitManager$PredicatePushdownInfo.class */
    public static class PredicatePushdownInfo {
        private PositionImpl startPosition;
        private PositionImpl endPosition;
        private long numOfEntries;

        private PredicatePushdownInfo(PositionImpl positionImpl, PositionImpl positionImpl2, long j) {
            this.startPosition = positionImpl;
            this.endPosition = positionImpl2;
            this.numOfEntries = j;
        }

        public static PredicatePushdownInfo getPredicatePushdownInfo(String str, TupleDomain<ColumnHandle> tupleDomain, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig, String str2, long j) throws ManagedLedgerException, InterruptedException {
            Domain domain;
            PositionImpl findPosition;
            PositionImpl findPosition2;
            ReadOnlyCursor readOnlyCursor = null;
            try {
                readOnlyCursor = managedLedgerFactory.openReadOnlyCursor(str2, PositionImpl.earliest, managedLedgerConfig);
                if (!tupleDomain.getDomains().isPresent() || (domain = (Domain) ((Map) tupleDomain.getDomains().get()).get(PulsarInternalColumn.PUBLISH_TIME.getColumnHandle(str, false))) == null || domain.getValues().getRanges().getRangeCount() != 1) {
                    if (readOnlyCursor == null) {
                        return null;
                    }
                    readOnlyCursor.close();
                    return null;
                }
                Preconditions.checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
                Long l = null;
                Long l2 = null;
                Range range = (Range) domain.getValues().getRanges().getOrderedRanges().get(0);
                if (!range.getHigh().isUpperUnbounded()) {
                    l = Long.valueOf(new Timestamp(((Block) range.getHigh().getValueBlock().get()).getLong(0, 0)).getTime());
                }
                if (!range.getLow().isLowerUnbounded()) {
                    l2 = Long.valueOf(new Timestamp(((Block) range.getLow().getValueBlock().get()).getLong(0, 0)).getTime());
                }
                if (l2 == null) {
                    findPosition = (PositionImpl) readOnlyCursor.getReadPosition();
                } else {
                    findPosition = PulsarSplitManager.findPosition(readOnlyCursor, l2.longValue());
                    if (findPosition == null) {
                        findPosition = (PositionImpl) readOnlyCursor.getReadPosition();
                    }
                }
                if (l == null) {
                    readOnlyCursor.skipEntries(Math.toIntExact(j));
                    findPosition2 = (PositionImpl) readOnlyCursor.getReadPosition();
                } else {
                    findPosition2 = PulsarSplitManager.findPosition(readOnlyCursor, l.longValue());
                    if (findPosition2 == null) {
                        findPosition2 = findPosition;
                    }
                }
                PredicatePushdownInfo predicatePushdownInfo = new PredicatePushdownInfo(findPosition, findPosition2, readOnlyCursor.getNumberOfEntries(com.google.common.collect.Range.range(findPosition, BoundType.CLOSED, findPosition2, BoundType.CLOSED)) - 1);
                PulsarSplitManager.log.debug("Predicate pushdown optimization calculated: %s", new Object[]{predicatePushdownInfo});
                if (readOnlyCursor != null) {
                    readOnlyCursor.close();
                }
                return predicatePushdownInfo;
            } catch (Throwable th) {
                if (readOnlyCursor != null) {
                    readOnlyCursor.close();
                }
                throw th;
            }
        }

        public PositionImpl getStartPosition() {
            return this.startPosition;
        }

        public PositionImpl getEndPosition() {
            return this.endPosition;
        }

        public long getNumOfEntries() {
            return this.numOfEntries;
        }

        public void setStartPosition(PositionImpl positionImpl) {
            this.startPosition = positionImpl;
        }

        public void setEndPosition(PositionImpl positionImpl) {
            this.endPosition = positionImpl;
        }

        public void setNumOfEntries(long j) {
            this.numOfEntries = j;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof PredicatePushdownInfo)) {
                return false;
            }
            PredicatePushdownInfo predicatePushdownInfo = (PredicatePushdownInfo) obj;
            if (!predicatePushdownInfo.canEqual(this) || getNumOfEntries() != predicatePushdownInfo.getNumOfEntries()) {
                return false;
            }
            PositionImpl startPosition = getStartPosition();
            PositionImpl startPosition2 = predicatePushdownInfo.getStartPosition();
            if (startPosition == null) {
                if (startPosition2 != null) {
                    return false;
                }
            } else if (!startPosition.equals(startPosition2)) {
                return false;
            }
            PositionImpl endPosition = getEndPosition();
            PositionImpl endPosition2 = predicatePushdownInfo.getEndPosition();
            return endPosition == null ? endPosition2 == null : endPosition.equals(endPosition2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof PredicatePushdownInfo;
        }

        public int hashCode() {
            long numOfEntries = getNumOfEntries();
            int i = (1 * 59) + ((int) ((numOfEntries >>> 32) ^ numOfEntries));
            PositionImpl startPosition = getStartPosition();
            int hashCode = (i * 59) + (startPosition == null ? 43 : startPosition.hashCode());
            PositionImpl endPosition = getEndPosition();
            return (hashCode * 59) + (endPosition == null ? 43 : endPosition.hashCode());
        }

        public String toString() {
            return "PulsarSplitManager.PredicatePushdownInfo(startPosition=" + getStartPosition() + ", endPosition=" + getEndPosition() + ", numOfEntries=" + getNumOfEntries() + ")";
        }
    }

    @Inject
    public PulsarSplitManager(PulsarConnectorId pulsarConnectorId, PulsarConnectorConfig pulsarConnectorConfig) {
        this.connectorId = ((PulsarConnectorId) Objects.requireNonNull(pulsarConnectorId, "connectorId is null")).toString();
        this.pulsarConnectorConfig = (PulsarConnectorConfig) Objects.requireNonNull(pulsarConnectorConfig, "pulsarConnectorConfig is null");
        try {
            this.pulsarAdmin = pulsarConnectorConfig.getPulsarAdmin();
        } catch (PulsarClientException e) {
            log.error(e);
            throw new RuntimeException((Throwable) e);
        }
    }

    public ConnectorSplitSource getSplits(ConnectorTransactionHandle connectorTransactionHandle, ConnectorSession connectorSession, ConnectorTableLayoutHandle connectorTableLayoutHandle, ConnectorSplitManager.SplitSchedulingStrategy splitSchedulingStrategy) {
        SchemaInfo defaultSchema;
        Collection<PulsarSplit> splitsPartitionedTopic;
        int targetNumSplits = this.pulsarConnectorConfig.getTargetNumSplits();
        PulsarTableLayoutHandle pulsarTableLayoutHandle = (PulsarTableLayoutHandle) connectorTableLayoutHandle;
        PulsarTableHandle table = pulsarTableLayoutHandle.getTable();
        TupleDomain<ColumnHandle> tupleDomain = pulsarTableLayoutHandle.getTupleDomain();
        String restoreNamespaceDelimiterIfNeeded = PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded(table.getSchemaName(), this.pulsarConnectorConfig);
        TopicName topicName = TopicName.get("persistent", NamespaceName.get(restoreNamespaceDelimiterIfNeeded), table.getTopicName());
        try {
            defaultSchema = this.pulsarAdmin.schemas().getSchemaInfo(String.format("%s/%s", restoreNamespaceDelimiterIfNeeded, table.getTopicName()));
        } catch (PulsarAdminException e) {
            if (e.getStatusCode() == 401) {
                throw new PrestoException(StandardErrorCode.QUERY_REJECTED, String.format("Failed to get pulsar topic schema for topic %s/%s: Unauthorized", restoreNamespaceDelimiterIfNeeded, table.getTopicName()));
            }
            if (e.getStatusCode() != 404) {
                throw new RuntimeException("Failed to get pulsar topic schema for topic " + String.format("%s/%s", restoreNamespaceDelimiterIfNeeded, table.getTopicName()) + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
            }
            defaultSchema = PulsarSqlSchemaInfoProvider.defaultSchema();
        }
        try {
            OffloadPoliciesImpl offloadPoliciesImpl = (OffloadPoliciesImpl) this.pulsarAdmin.namespaces().getOffloadPolicies(topicName.getNamespace());
            if (offloadPoliciesImpl != null) {
                offloadPoliciesImpl.setOffloadersDirectory(this.pulsarConnectorConfig.getOffloadersDirectory());
                offloadPoliciesImpl.setManagedLedgerOffloadMaxThreads(Integer.valueOf(this.pulsarConnectorConfig.getManagedLedgerOffloadMaxThreads()));
            }
            if (PulsarConnectorUtils.isPartitionedTopic(topicName, this.pulsarAdmin)) {
                splitsPartitionedTopic = getSplitsPartitionedTopic(targetNumSplits, topicName, table, defaultSchema, tupleDomain, offloadPoliciesImpl);
                log.debug("Splits for partitioned topic %s: %s", new Object[]{topicName, splitsPartitionedTopic});
            } else {
                splitsPartitionedTopic = getSplitsNonPartitionedTopic(targetNumSplits, topicName, table, defaultSchema, tupleDomain, offloadPoliciesImpl);
                log.debug("Splits for non-partitioned topic %s: %s", new Object[]{topicName, splitsPartitionedTopic});
            }
            return new FixedSplitSource(splitsPartitionedTopic);
        } catch (Exception e2) {
            log.error(e2, "Failed to get splits");
            throw new RuntimeException(e2);
        }
    }

    @VisibleForTesting
    Collection<PulsarSplit> getSplitsPartitionedTopic(int i, TopicName topicName, PulsarTableHandle pulsarTableHandle, SchemaInfo schemaInfo, TupleDomain<ColumnHandle> tupleDomain, OffloadPoliciesImpl offloadPoliciesImpl) throws Exception {
        List<Integer> predicatedPartitions = getPredicatedPartitions(topicName, tupleDomain);
        if (log.isDebugEnabled()) {
            log.debug("Partition filter result %s", new Object[]{predicatedPartitions});
        }
        int max = Math.max(predicatedPartitions.size(), i);
        int size = max / predicatedPartitions.size();
        int size2 = max % predicatedPartitions.size();
        PulsarConnectorCache connectorCache = PulsarConnectorCache.getConnectorCache(this.pulsarConnectorConfig);
        ManagedLedgerFactory managedLedgerFactory = connectorCache.getManagedLedgerFactory();
        ManagedLedgerConfig managedLedgerConfig = connectorCache.getManagedLedgerConfig(topicName.getNamespaceObject(), offloadPoliciesImpl, this.pulsarConnectorConfig);
        LinkedList linkedList = new LinkedList();
        int i2 = 0;
        while (i2 < predicatedPartitions.size()) {
            linkedList.addAll(getSplitsForTopic(topicName.getPartition(predicatedPartitions.get(i2).intValue()).getPersistenceNamingEncoding(), managedLedgerFactory, managedLedgerConfig, size2 > i2 ? size + 1 : size, pulsarTableHandle, schemaInfo, topicName.getPartition(predicatedPartitions.get(i2).intValue()).getLocalName(), tupleDomain, offloadPoliciesImpl));
            i2++;
        }
        return linkedList;
    }

    private List<Integer> getPredicatedPartitions(TopicName topicName, TupleDomain<ColumnHandle> tupleDomain) {
        try {
            int i = this.pulsarAdmin.topics().getPartitionedTopicMetadata(topicName.toString()).partitions;
            ArrayList arrayList = new ArrayList();
            if (tupleDomain.getDomains().isPresent()) {
                Domain domain = (Domain) ((Map) tupleDomain.getDomains().get()).get(PulsarInternalColumn.PARTITION.getColumnHandle(this.connectorId, false));
                if (domain != null) {
                    domain.getValues().getValuesProcessor().consume(ranges -> {
                        domain.getValues().getRanges().getOrderedRanges().forEach(range -> {
                            Integer num = 0;
                            Integer valueOf = Integer.valueOf(i);
                            if (!range.getLow().isLowerUnbounded() && range.getLow().getValueBlock().isPresent()) {
                                num = Integer.valueOf(((Block) range.getLow().getValueBlock().get()).getInt(0, 0));
                            }
                            if (!range.getHigh().isLowerUnbounded() && range.getHigh().getValueBlock().isPresent()) {
                                valueOf = Integer.valueOf(((Block) range.getHigh().getValueBlock().get()).getInt(0, 0));
                            }
                            for (int intValue = num.intValue(); intValue <= valueOf.intValue(); intValue++) {
                                arrayList.add(Integer.valueOf(intValue));
                            }
                        });
                    }, discreteValues -> {
                    }, allOrNone -> {
                    });
                } else {
                    for (int i2 = 0; i2 < i; i2++) {
                        arrayList.add(Integer.valueOf(i2));
                    }
                }
            } else {
                for (int i3 = 0; i3 < i; i3++) {
                    arrayList.add(Integer.valueOf(i3));
                }
            }
            return arrayList;
        } catch (PulsarAdminException e) {
            if (e.getStatusCode() == 401) {
                throw new PrestoException(StandardErrorCode.QUERY_REJECTED, String.format("Failed to get metadata for partitioned topic %s: Unauthorized", topicName));
            }
            throw new RuntimeException("Failed to get metadata for partitioned topic " + topicName + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
        }
    }

    @VisibleForTesting
    Collection<PulsarSplit> getSplitsNonPartitionedTopic(int i, TopicName topicName, PulsarTableHandle pulsarTableHandle, SchemaInfo schemaInfo, TupleDomain<ColumnHandle> tupleDomain, OffloadPoliciesImpl offloadPoliciesImpl) throws Exception {
        PulsarConnectorCache connectorCache = PulsarConnectorCache.getConnectorCache(this.pulsarConnectorConfig);
        return getSplitsForTopic(topicName.getPersistenceNamingEncoding(), connectorCache.getManagedLedgerFactory(), connectorCache.getManagedLedgerConfig(topicName.getNamespaceObject(), offloadPoliciesImpl, this.pulsarConnectorConfig), i, pulsarTableHandle, schemaInfo, topicName.getLocalName(), tupleDomain, offloadPoliciesImpl);
    }

    @VisibleForTesting
    Collection<PulsarSplit> getSplitsForTopic(String str, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig, int i, PulsarTableHandle pulsarTableHandle, SchemaInfo schemaInfo, String str2, TupleDomain<ColumnHandle> tupleDomain, OffloadPoliciesImpl offloadPoliciesImpl) throws ManagedLedgerException, InterruptedException, IOException {
        PositionImpl positionImpl;
        ReadOnlyCursor readOnlyCursor = null;
        try {
            ReadOnlyCursor openReadOnlyCursor = managedLedgerFactory.openReadOnlyCursor(str, PositionImpl.earliest, managedLedgerConfig);
            long numberOfEntries = openReadOnlyCursor.getNumberOfEntries();
            if (numberOfEntries <= 0) {
                List list = Collections.EMPTY_LIST;
                if (openReadOnlyCursor != null) {
                    try {
                        openReadOnlyCursor.close();
                    } catch (Exception e) {
                        log.error(e);
                    }
                }
                return list;
            }
            PredicatePushdownInfo predicatePushdownInfo = PredicatePushdownInfo.getPredicatePushdownInfo(this.connectorId, tupleDomain, managedLedgerFactory, managedLedgerConfig, str, numberOfEntries);
            if (predicatePushdownInfo != null) {
                numberOfEntries = predicatePushdownInfo.getNumOfEntries();
                positionImpl = predicatePushdownInfo.getStartPosition();
            } else {
                positionImpl = (PositionImpl) openReadOnlyCursor.getReadPosition();
            }
            openReadOnlyCursor.close();
            ReadOnlyCursor openReadOnlyCursor2 = managedLedgerFactory.openReadOnlyCursor(str, positionImpl, new ManagedLedgerConfig());
            long j = numberOfEntries % i;
            long j2 = numberOfEntries / i;
            LinkedList linkedList = new LinkedList();
            for (int i2 = 0; i2 < i; i2++) {
                long j3 = j > ((long) i2) ? j2 + 1 : j2;
                PositionImpl positionImpl2 = (PositionImpl) openReadOnlyCursor2.getReadPosition();
                openReadOnlyCursor2.skipEntries(Math.toIntExact(j3));
                PositionImpl positionImpl3 = (PositionImpl) openReadOnlyCursor2.getReadPosition();
                linkedList.add(new PulsarSplit(i2, this.connectorId, PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded(pulsarTableHandle.getSchemaName(), this.pulsarConnectorConfig), schemaInfo.getName(), str2, j3, new String(schemaInfo.getSchema(), "ISO8859-1"), schemaInfo.getType(), positionImpl2.getEntryId(), positionImpl3.getEntryId(), positionImpl2.getLedgerId(), positionImpl3.getLedgerId(), tupleDomain, this.objectMapper.writeValueAsString(schemaInfo.getProperties()), offloadPoliciesImpl));
            }
            if (openReadOnlyCursor2 != null) {
                try {
                    openReadOnlyCursor2.close();
                } catch (Exception e2) {
                    log.error(e2);
                }
            }
            return linkedList;
        } catch (Throwable th) {
            if (0 != 0) {
                try {
                    readOnlyCursor.close();
                } catch (Exception e3) {
                    log.error(e3);
                }
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static PositionImpl findPosition(ReadOnlyCursor readOnlyCursor, final long j) throws ManagedLedgerException, InterruptedException {
        return (PositionImpl) readOnlyCursor.findNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, new Predicate<Entry>() { // from class: org.apache.pulsar.sql.presto.PulsarSplitManager.1
            public boolean apply(Entry entry) {
                try {
                    try {
                        boolean z = MessageImpl.getEntryTimestamp(entry.getDataBuffer()) <= j;
                        entry.release();
                        return z;
                    } catch (Exception e) {
                        PulsarSplitManager.log.error(e, "Failed To deserialize message when finding position with error: %s", new Object[]{e});
                        entry.release();
                        return false;
                    }
                } catch (Throwable th) {
                    entry.release();
                    throw th;
                }
            }
        });
    }
}
