package org.apache.bookkeeper.mledger.offload.filesystem.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.Recycler;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.offload.OffloadUtils;
import org.apache.bookkeeper.mledger.offload.filesystem.FileSystemLedgerOffloaderFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.class */
public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) FileSystemManagedLedgerOffloader.class);
    private static final String STORAGE_BASE_PATH = "storageBasePath";
    private static final String DRIVER_NAMES = "filesystem";
    private static final String MANAGED_LEDGER_NAME = "ManagedLedgerName";
    static final long METADATA_KEY_INDEX = -1;
    private final Configuration configuration = new Configuration();
    private final String driverName;
    private final String storageBasePath;
    private final FileSystem fileSystem;
    private OrderedScheduler scheduler;
    private static final long ENTRIES_PER_READ = 100;
    private OrderedScheduler assignmentScheduler;
    private OffloadPolicies offloadPolicies;
    private final LedgerOffloaderStats offloaderStats;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader$FileSystemWriter.class */
    public static class FileSystemWriter implements Runnable {
        private LedgerEntries ledgerEntriesOnce;
        private final LongWritable key = new LongWritable();
        private final BytesWritable value = new BytesWritable();
        private MapFile.Writer dataWriter;
        private CountDownLatch countDownLatch;
        private AtomicLong haveOffloadEntryNumber;
        private LedgerReader ledgerReader;
        private Semaphore semaphore;
        private Recycler.Handle<FileSystemWriter> recyclerHandle;
        private static final Recycler<FileSystemWriter> RECYCLER = new Recycler<FileSystemWriter>() { // from class: org.apache.bookkeeper.mledger.offload.filesystem.impl.FileSystemManagedLedgerOffloader.FileSystemWriter.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // io.netty.util.Recycler
            public FileSystemWriter newObject(Recycler.Handle<FileSystemWriter> handle) {
                return new FileSystemWriter(handle);
            }
        };

        private FileSystemWriter(Recycler.Handle<FileSystemWriter> handle) {
            this.recyclerHandle = handle;
        }

        private void recycle() {
            this.dataWriter = null;
            this.countDownLatch = null;
            this.haveOffloadEntryNumber = null;
            this.ledgerReader = null;
            this.ledgerEntriesOnce = null;
            this.semaphore = null;
            this.recyclerHandle.recycle(this);
        }

        public static FileSystemWriter create(LedgerEntries ledgerEntries, MapFile.Writer writer, Semaphore semaphore, CountDownLatch countDownLatch, AtomicLong atomicLong, LedgerReader ledgerReader) {
            FileSystemWriter fileSystemWriter = RECYCLER.get();
            fileSystemWriter.ledgerReader = ledgerReader;
            fileSystemWriter.dataWriter = writer;
            fileSystemWriter.countDownLatch = countDownLatch;
            fileSystemWriter.haveOffloadEntryNumber = atomicLong;
            fileSystemWriter.ledgerEntriesOnce = ledgerEntries;
            fileSystemWriter.semaphore = semaphore;
            return fileSystemWriter;
        }

        @Override // java.lang.Runnable
        public void run() {
            String fromPersistenceNamingEncoding = TopicName.fromPersistenceNamingEncoding(this.ledgerReader.extraMetadata.get(FileSystemManagedLedgerOffloader.MANAGED_LEDGER_NAME));
            if (this.ledgerReader.fileSystemWriteException == null) {
                for (LedgerEntry ledgerEntry : this.ledgerEntriesOnce) {
                    this.key.set(ledgerEntry.getEntryId());
                    try {
                        byte[] entryBytes = ledgerEntry.getEntryBytes();
                        int length = entryBytes.length;
                        this.value.set(entryBytes, 0, length);
                        this.dataWriter.append(this.key, this.value);
                        this.haveOffloadEntryNumber.incrementAndGet();
                        this.ledgerReader.offloaderStats.recordOffloadBytes(fromPersistenceNamingEncoding, length);
                    } catch (IOException e) {
                        this.ledgerReader.fileSystemWriteException = e;
                        this.ledgerReader.offloaderStats.recordWriteToStorageError(fromPersistenceNamingEncoding);
                    }
                }
            }
            this.countDownLatch.countDown();
            this.ledgerEntriesOnce.close();
            this.semaphore.release();
            recycle();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader$LedgerReader.class */
    public static class LedgerReader implements Runnable {
        private final ReadHandle readHandle;
        private final UUID uuid;
        private final Map<String, String> extraMetadata;
        private final CompletableFuture<Void> promise;
        private final String storageBasePath;
        private final Configuration configuration;
        volatile Exception fileSystemWriteException = null;
        private OrderedScheduler assignmentScheduler;
        private int managedLedgerOffloadPrefetchRounds;
        private final LedgerOffloaderStats offloaderStats;

        private LedgerReader(ReadHandle readHandle, UUID uuid, Map<String, String> map, CompletableFuture<Void> completableFuture, String str, Configuration configuration, OrderedScheduler orderedScheduler, int i, LedgerOffloaderStats ledgerOffloaderStats) {
            this.managedLedgerOffloadPrefetchRounds = 1;
            this.readHandle = readHandle;
            this.uuid = uuid;
            this.extraMetadata = map;
            this.promise = completableFuture;
            this.storageBasePath = str;
            this.configuration = configuration;
            this.assignmentScheduler = orderedScheduler;
            this.managedLedgerOffloadPrefetchRounds = i;
            this.offloaderStats = ledgerOffloaderStats;
        }

        @Override // java.lang.Runnable
        public void run() {
            CountDownLatch countDownLatch;
            if (this.readHandle.getLength() == 0 || !this.readHandle.isClosed() || this.readHandle.getLastAddConfirmed() < 0) {
                this.promise.completeExceptionally(new IllegalArgumentException("An empty or open ledger should never be offloaded"));
                return;
            }
            long id = this.readHandle.getId();
            String str = this.extraMetadata.get(FileSystemManagedLedgerOffloader.MANAGED_LEDGER_NAME);
            String dataFilePath = FileSystemManagedLedgerOffloader.getDataFilePath(FileSystemManagedLedgerOffloader.getStoragePath(this.storageBasePath, str), id, this.uuid);
            String fromPersistenceNamingEncoding = TopicName.fromPersistenceNamingEncoding(str);
            LongWritable longWritable = new LongWritable();
            BytesWritable bytesWritable = new BytesWritable();
            try {
                MapFile.Writer writer = new MapFile.Writer(this.configuration, new Path(dataFilePath), MapFile.Writer.keyClass(LongWritable.class), MapFile.Writer.valueClass(BytesWritable.class));
                longWritable.set(-1L);
                byte[] buildLedgerMetadataFormat = OffloadUtils.buildLedgerMetadataFormat(this.readHandle.getLedgerMetadata());
                bytesWritable.set(buildLedgerMetadataFormat, 0, buildLedgerMetadataFormat.length);
                writer.append(longWritable, bytesWritable);
                AtomicLong atomicLong = new AtomicLong(0L);
                long j = 0;
                Semaphore semaphore = new Semaphore(this.managedLedgerOffloadPrefetchRounds);
                do {
                    long min = Math.min((j + 100) - 1, this.readHandle.getLastAddConfirmed());
                    FileSystemManagedLedgerOffloader.log.debug("read ledger entries. start: {}, end: {}", Long.valueOf(j), Long.valueOf(min));
                    long nanoTime = System.nanoTime();
                    LedgerEntries ledgerEntries = this.readHandle.readAsync(j, min).get();
                    this.offloaderStats.recordReadLedgerLatency(fromPersistenceNamingEncoding, System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                    semaphore.acquire();
                    countDownLatch = new CountDownLatch(1);
                    this.assignmentScheduler.chooseThread(id).execute(FileSystemWriter.create(ledgerEntries, writer, semaphore, countDownLatch, atomicLong, this));
                    j = min + 1;
                    if (j - 1 == this.readHandle.getLastAddConfirmed()) {
                        break;
                    }
                } while (this.fileSystemWriteException == null);
                countDownLatch.await();
                if (this.fileSystemWriteException != null) {
                    throw this.fileSystemWriteException;
                }
                IOUtils.closeStream(writer);
                this.promise.complete(null);
            } catch (Exception e) {
                FileSystemManagedLedgerOffloader.log.error("Exception when get CompletableFuture<LedgerEntries> : ManagerLedgerName: {}, LedgerId: {}, UUID: {} ", str, Long.valueOf(id), this.uuid, e);
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                this.offloaderStats.recordOffloadError(fromPersistenceNamingEncoding);
                this.promise.completeExceptionally(e);
            }
        }
    }

    public static boolean driverSupported(String str) {
        return DRIVER_NAMES.equals(str);
    }

    @Override // org.apache.bookkeeper.mledger.LedgerOffloader
    public String getOffloadDriverName() {
        return this.driverName;
    }

    public static FileSystemManagedLedgerOffloader create(OffloadPoliciesImpl offloadPoliciesImpl, OrderedScheduler orderedScheduler, LedgerOffloaderStats ledgerOffloaderStats) throws IOException {
        return new FileSystemManagedLedgerOffloader(offloadPoliciesImpl, orderedScheduler, ledgerOffloaderStats);
    }

    private FileSystemManagedLedgerOffloader(OffloadPoliciesImpl offloadPoliciesImpl, OrderedScheduler orderedScheduler, LedgerOffloaderStats ledgerOffloaderStats) throws IOException {
        this.offloadPolicies = offloadPoliciesImpl;
        if (offloadPoliciesImpl.getFileSystemProfilePath() != null) {
            for (String str : offloadPoliciesImpl.getFileSystemProfilePath().split(",")) {
                this.configuration.addResource(new Path(str));
            }
        }
        if (!"".equals(offloadPoliciesImpl.getFileSystemURI()) && offloadPoliciesImpl.getFileSystemURI() != null) {
            this.configuration.set("fs.defaultFS", offloadPoliciesImpl.getFileSystemURI());
        }
        if (this.configuration.get("fs.hdfs.impl") == null) {
            this.configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        }
        if (this.configuration.get(CommonConfigurationKeysPublic.FS_FILE_IMPL_KEY) == null) {
            this.configuration.set(CommonConfigurationKeysPublic.FS_FILE_IMPL_KEY, "org.apache.hadoop.fs.LocalFileSystem");
        }
        this.configuration.setClassLoader(FileSystemLedgerOffloaderFactory.class.getClassLoader());
        this.driverName = offloadPoliciesImpl.getManagedLedgerOffloadDriver();
        this.storageBasePath = this.configuration.get("fs.defaultFS");
        this.scheduler = orderedScheduler;
        this.fileSystem = FileSystem.get(this.configuration);
        this.assignmentScheduler = OrderedScheduler.newSchedulerBuilder().numThreads(offloadPoliciesImpl.getManagedLedgerOffloadMaxThreads().intValue()).name("offload-assignment").build();
        this.offloaderStats = ledgerOffloaderStats;
    }

    @VisibleForTesting
    public FileSystemManagedLedgerOffloader(OffloadPoliciesImpl offloadPoliciesImpl, OrderedScheduler orderedScheduler, String str, String str2, LedgerOffloaderStats ledgerOffloaderStats) throws IOException {
        this.offloadPolicies = offloadPoliciesImpl;
        this.configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        this.configuration.set("fs.defaultFS", str);
        this.configuration.setClassLoader(FileSystemLedgerOffloaderFactory.class.getClassLoader());
        this.driverName = offloadPoliciesImpl.getManagedLedgerOffloadDriver();
        this.configuration.set(CommonConfigurationKeys.HADOOP_TMP_DIR, str2);
        this.storageBasePath = str2;
        this.scheduler = orderedScheduler;
        this.fileSystem = FileSystem.get(this.configuration);
        this.assignmentScheduler = OrderedScheduler.newSchedulerBuilder().numThreads(offloadPoliciesImpl.getManagedLedgerOffloadMaxThreads().intValue()).name("offload-assignment").build();
        this.offloaderStats = ledgerOffloaderStats;
    }

    @Override // org.apache.bookkeeper.mledger.LedgerOffloader
    public Map<String, String> getOffloadDriverMetadata() {
        return ImmutableMap.of(STORAGE_BASE_PATH, this.storageBasePath == null ? "null" : this.storageBasePath);
    }

    @Override // org.apache.bookkeeper.mledger.LedgerOffloader
    public CompletableFuture<Void> offload(ReadHandle readHandle, UUID uuid, Map<String, String> map) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.scheduler.chooseThread(readHandle.getId()).execute(new LedgerReader(readHandle, uuid, map, completableFuture, this.storageBasePath, this.configuration, this.assignmentScheduler, this.offloadPolicies.getManagedLedgerOffloadPrefetchRounds().intValue(), this.offloaderStats));
        return completableFuture;
    }

    @Override // org.apache.bookkeeper.mledger.LedgerOffloader
    public CompletableFuture<ReadHandle> readOffloaded(long j, UUID uuid, Map<String, String> map) {
        String str = map.get(MANAGED_LEDGER_NAME);
        CompletableFuture<ReadHandle> completableFuture = new CompletableFuture<>();
        String dataFilePath = getDataFilePath(getStoragePath(this.storageBasePath, str), j, uuid);
        this.scheduler.chooseThread(j).execute(() -> {
            try {
                completableFuture.complete(FileStoreBackedReadHandleImpl.open(this.scheduler.chooseThread(j), new MapFile.Reader(new Path(dataFilePath), this.configuration, new SequenceFile.Reader.Option[0]), j, this.offloaderStats, str));
            } catch (Throwable th) {
                log.error("Failed to open FileStoreBackedReadHandleImpl: ManagerLedgerName: {}, LegerId: {}, UUID: {}", str, Long.valueOf(j), uuid, th);
                completableFuture.completeExceptionally(th);
            }
        });
        return completableFuture;
    }

    private static String getStoragePath(String str, String str2) {
        return str == null ? str2 + "/" : str + "/" + str2 + "/";
    }

    private static String getDataFilePath(String str, long j, UUID uuid) {
        uuid.toString();
        return str + j + "-" + str;
    }

    @Override // org.apache.bookkeeper.mledger.LedgerOffloader
    public CompletableFuture<Void> deleteOffloaded(long j, UUID uuid, Map<String, String> map) {
        String str = map.get(MANAGED_LEDGER_NAME);
        String dataFilePath = getDataFilePath(getStoragePath(this.storageBasePath, str), j, uuid);
        String fromPersistenceNamingEncoding = TopicName.fromPersistenceNamingEncoding(str);
        CompletableFuture completableFuture = new CompletableFuture();
        try {
            this.fileSystem.delete(new Path(dataFilePath), true);
            completableFuture.complete(null);
        } catch (IOException e) {
            log.error("Failed to delete Offloaded: ", (Throwable) e);
            completableFuture.completeExceptionally(e);
        }
        return completableFuture.whenComplete((r6, th) -> {
            this.offloaderStats.recordDeleteOffloadOps(fromPersistenceNamingEncoding, th == null);
        });
    }

    @Override // org.apache.bookkeeper.mledger.LedgerOffloader
    public OffloadPolicies getOffloadPolicies() {
        return this.offloadPolicies;
    }

    @Override // org.apache.bookkeeper.mledger.LedgerOffloader
    public void close() {
        if (this.fileSystem != null) {
            try {
                this.fileSystem.close();
            } catch (Exception e) {
                log.error("FileSystemManagedLedgerOffloader close failed!", (Throwable) e);
            }
        }
        if (this.assignmentScheduler != null) {
            MoreExecutors.shutdownAndAwaitTermination(this.assignmentScheduler, 5L, TimeUnit.SECONDS);
        }
    }
}
