package org.apache.kafka.server.log.remote.storage;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.NoSuchFileException;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/server/log/remote/storage/LocalTieredStorage.class */
public final class LocalTieredStorage implements RemoteStorageManager {
    public static final String STORAGE_CONFIG_PREFIX = "remote.log.storage.local.";
    public static final String STORAGE_DIR_CONFIG = "dir";
    public static final String DELETE_ON_CLOSE_CONFIG = "delete.on.close";
    public static final String TRANSFERER_CLASS_CONFIG = "transferer.class";
    public static final String ENABLE_DELETE_API_CONFIG = "delete.enable";
    public static final String BROKER_ID = "broker.id";
    private static final String ROOT_STORAGE_DIR_NAME = "kafka-tiered-storage";
    private volatile File storageDirectory;
    private volatile boolean deleteOnClose = false;
    private volatile boolean deleteEnabled = true;
    private volatile Transferer transferer = new Transferer() { // from class: org.apache.kafka.server.log.remote.storage.LocalTieredStorage.1
        @Override // org.apache.kafka.server.log.remote.storage.Transferer
        public void transfer(File file, File file2) throws IOException {
            if (file.exists()) {
                Files.copy(file.toPath(), file2.toPath(), new CopyOption[0]);
            }
        }

        @Override // org.apache.kafka.server.log.remote.storage.Transferer
        public void transfer(ByteBuffer byteBuffer, File file) throws IOException {
            if (byteBuffer == null || !byteBuffer.hasRemaining()) {
                return;
            }
            FileOutputStream fileOutputStream = new FileOutputStream(file, false);
            Throwable th = null;
            try {
                FileChannel channel = fileOutputStream.getChannel();
                Throwable th2 = null;
                try {
                    try {
                        channel.write(byteBuffer);
                        if (channel != null) {
                            if (0 != 0) {
                                try {
                                    channel.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                channel.close();
                            }
                        }
                        if (fileOutputStream != null) {
                            if (0 == 0) {
                                fileOutputStream.close();
                                return;
                            }
                            try {
                                fileOutputStream.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        }
                    } catch (Throwable th5) {
                        th2 = th5;
                        throw th5;
                    }
                } catch (Throwable th6) {
                    if (channel != null) {
                        if (th2 != null) {
                            try {
                                channel.close();
                            } catch (Throwable th7) {
                                th2.addSuppressed(th7);
                            }
                        } else {
                            channel.close();
                        }
                    }
                    throw th6;
                }
            } catch (Throwable th8) {
                if (fileOutputStream != null) {
                    if (0 != 0) {
                        try {
                            fileOutputStream.close();
                        } catch (Throwable th9) {
                            th.addSuppressed(th9);
                        }
                    } else {
                        fileOutputStream.close();
                    }
                }
                throw th8;
            }
        }
    };
    private volatile int brokerId = -1;
    private Logger logger = LoggerFactory.getLogger(LocalTieredStorage.class);
    private final AtomicInteger eventTimestamp = new AtomicInteger(-1);
    private final LocalTieredStorageListener.LocalTieredStorageListeners storageListeners = new LocalTieredStorageListener.LocalTieredStorageListeners();
    private final LocalTieredStorageHistory history = new LocalTieredStorageHistory();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.server.log.remote.storage.LocalTieredStorage$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/server/log/remote/storage/LocalTieredStorage$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$server$log$remote$storage$RemoteStorageManager$IndexType = new int[RemoteStorageManager.IndexType.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$server$log$remote$storage$RemoteStorageManager$IndexType[RemoteStorageManager.IndexType.OFFSET.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$server$log$remote$storage$RemoteStorageManager$IndexType[RemoteStorageManager.IndexType.TIMESTAMP.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$server$log$remote$storage$RemoteStorageManager$IndexType[RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$kafka$server$log$remote$storage$RemoteStorageManager$IndexType[RemoteStorageManager.IndexType.TRANSACTION.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$kafka$server$log$remote$storage$RemoteStorageManager$IndexType[RemoteStorageManager.IndexType.LEADER_EPOCH.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    public LocalTieredStorage() {
        this.history.listenTo(this);
    }

    public void traverse(LocalTieredStorageTraverser localTieredStorageTraverser) {
        Objects.requireNonNull(localTieredStorageTraverser);
        File[] listFiles = this.storageDirectory.listFiles();
        if (listFiles == null) {
            return;
        }
        Arrays.stream(listFiles).filter((v0) -> {
            return v0.isDirectory();
        }).forEach(file -> {
            RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory(file.getName(), this.storageDirectory).traverse(localTieredStorageTraverser);
        });
    }

    public void addListener(LocalTieredStorageListener localTieredStorageListener) {
        this.storageListeners.add(localTieredStorageListener);
    }

    public void configure(Map<String, ?> map) {
        if (this.storageDirectory != null) {
            throw new InvalidConfigurationException(String.format("This instance of local remote storageis already configured. The existing storage directory is %s. Ensure the method configure() is only called once.", this.storageDirectory.getAbsolutePath()));
        }
        String str = (String) map.get(STORAGE_DIR_CONFIG);
        String str2 = (String) map.get(DELETE_ON_CLOSE_CONFIG);
        String str3 = (String) map.get(TRANSFERER_CLASS_CONFIG);
        String str4 = (String) map.get(ENABLE_DELETE_API_CONFIG);
        Integer num = (Integer) map.get(BROKER_ID);
        if (num == null) {
            throw new InvalidConfigurationException("Broker ID is required to configure the LocalTieredStorage manager.");
        }
        this.brokerId = num.intValue();
        this.logger = new LogContext(String.format("[LocalTieredStorage Id=%d] ", Integer.valueOf(this.brokerId))).logger(getClass());
        if (str2 != null) {
            this.deleteOnClose = Boolean.parseBoolean(str2);
        }
        if (str4 != null) {
            this.deleteEnabled = Boolean.parseBoolean(str4);
        }
        if (str3 != null) {
            try {
                this.transferer = (Transferer) getClass().getClassLoader().loadClass(str3).newInstance();
            } catch (ClassCastException | ClassNotFoundException | IllegalAccessException | InstantiationException e) {
                throw new RuntimeException(String.format("Cannot create transferer from class '%s'", str3), e);
            }
        }
        if (str == null) {
            this.storageDirectory = TestUtils.tempDirectory("kafka-tiered-storage-");
            this.logger.debug("No storage directory specified, created temporary directory: {}", this.storageDirectory.getAbsolutePath());
        } else {
            this.storageDirectory = new File(str, ROOT_STORAGE_DIR_NAME);
            if (Files.exists(this.storageDirectory.toPath(), new LinkOption[0])) {
                this.logger.warn("Remote storage with ID [{}] already exists on the file system. Any data already in the remote storage will not be deleted and may result in an inconsistent state and/or provide stale data.", str);
            } else {
                try {
                    this.logger.info("Creating directory: [{}]", this.storageDirectory.getAbsolutePath());
                    Files.createDirectories(this.storageDirectory.toPath(), new FileAttribute[0]);
                } catch (IOException e2) {
                    throw new RuntimeException(String.format("Not able to create the storage directory '%s'", this.storageDirectory.getAbsolutePath()), e2);
                }
            }
        }
        this.logger.info("Created local tiered storage manager [{}]:[{}]", Integer.valueOf(this.brokerId), this.storageDirectory.getName());
    }

    public Optional<RemoteLogSegmentMetadata.CustomMetadata> copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata, LogSegmentData logSegmentData) throws RemoteStorageException {
        return (Optional) wrap(() -> {
            LocalTieredStorageEvent.Builder newEventBuilder = newEventBuilder(LocalTieredStorageEvent.EventType.COPY_SEGMENT, remoteLogSegmentMetadata);
            RemoteLogSegmentFileset remoteLogSegmentFileset = null;
            try {
                remoteLogSegmentFileset = RemoteLogSegmentFileset.openFileset(this.storageDirectory, remoteLogSegmentMetadata);
                this.logger.info("Offloading log segment for {} from segment={}", remoteLogSegmentMetadata.topicIdPartition(), logSegmentData.logSegment());
                remoteLogSegmentFileset.copy(this.transferer, logSegmentData);
                this.storageListeners.onStorageEvent(newEventBuilder.withFileset(remoteLogSegmentFileset).build());
                return Optional.empty();
            } catch (Exception e) {
                if (remoteLogSegmentFileset != null) {
                    remoteLogSegmentFileset.delete();
                }
                this.storageListeners.onStorageEvent(newEventBuilder.withException(e).build());
                throw e;
            }
        });
    }

    public InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, int i) throws RemoteStorageException {
        return fetchLogSegment(remoteLogSegmentMetadata, i, remoteLogSegmentMetadata.segmentSizeInBytes());
    }

    public InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata, int i, int i2) throws RemoteStorageException {
        checkArgument(i >= 0, "Start position must be positive", Integer.valueOf(i));
        checkArgument(i2 >= i, "End position cannot be less than startPosition", Integer.valueOf(i), Integer.valueOf(i2));
        checkArgument(remoteLogSegmentMetadata.segmentSizeInBytes() >= i2, "End position cannot be greater than segment size", Integer.valueOf(i2), Integer.valueOf(remoteLogSegmentMetadata.segmentSizeInBytes()));
        return (InputStream) wrap(() -> {
            LocalTieredStorageEvent.Builder newEventBuilder = newEventBuilder(LocalTieredStorageEvent.EventType.FETCH_SEGMENT, remoteLogSegmentMetadata);
            newEventBuilder.withStartPosition(i).withEndPosition(i2);
            try {
                RemoteLogSegmentFileset openFileset = RemoteLogSegmentFileset.openFileset(this.storageDirectory, remoteLogSegmentMetadata);
                InputStream newInputStream = Files.newInputStream(openFileset.getFile(RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT).toPath(), StandardOpenOption.READ);
                newInputStream.skip(i);
                this.storageListeners.onStorageEvent(newEventBuilder.withFileset(openFileset).build());
                return newInputStream;
            } catch (Exception e) {
                this.storageListeners.onStorageEvent(newEventBuilder.withException(e).build());
                throw e;
            }
        });
    }

    public InputStream fetchIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata, RemoteStorageManager.IndexType indexType) throws RemoteStorageException {
        LocalTieredStorageEvent.EventType eventTypeForFetch = getEventTypeForFetch(indexType);
        RemoteLogSegmentFileset.RemoteLogSegmentFileType logSegmentFileType = getLogSegmentFileType(indexType);
        return (InputStream) wrap(() -> {
            LocalTieredStorageEvent.Builder newEventBuilder = newEventBuilder(eventTypeForFetch, remoteLogSegmentMetadata);
            try {
                RemoteLogSegmentFileset openFileset = RemoteLogSegmentFileset.openFileset(this.storageDirectory, remoteLogSegmentMetadata);
                File file = openFileset.getFile(logSegmentFileType);
                if (logSegmentFileType.isOptional() && !file.exists()) {
                    throw new RemoteResourceNotFoundException("Index file for type: " + indexType + " not found for segment " + remoteLogSegmentMetadata.remoteLogSegmentId());
                }
                InputStream newInputStream = Files.newInputStream(file.toPath(), StandardOpenOption.READ);
                this.storageListeners.onStorageEvent(newEventBuilder.withFileset(openFileset).build());
                return newInputStream;
            } catch (Exception e) {
                this.storageListeners.onStorageEvent(newEventBuilder.withException(e).build());
                throw e;
            }
        });
    }

    public void deleteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
        wrap(() -> {
            LocalTieredStorageEvent.Builder newEventBuilder = newEventBuilder(LocalTieredStorageEvent.EventType.DELETE_SEGMENT, remoteLogSegmentMetadata);
            if (!this.deleteEnabled) {
                return null;
            }
            try {
                RemoteLogSegmentFileset openFileset = RemoteLogSegmentFileset.openFileset(this.storageDirectory, remoteLogSegmentMetadata);
                if (!openFileset.delete()) {
                    throw new RemoteStorageException("Failed to delete remote log segment with id:" + remoteLogSegmentMetadata.remoteLogSegmentId());
                }
                this.storageListeners.onStorageEvent(newEventBuilder.withFileset(openFileset).build());
                return null;
            } catch (Exception e) {
                this.storageListeners.onStorageEvent(newEventBuilder.withException(e).build());
                throw e;
            }
        });
    }

    public void deletePartition(TopicIdPartition topicIdPartition) throws RemoteStorageException {
        wrap(() -> {
            LocalTieredStorageEvent.Builder newEventBuilder = newEventBuilder(LocalTieredStorageEvent.EventType.DELETE_PARTITION, topicIdPartition);
            if (!this.deleteEnabled) {
                return null;
            }
            try {
                if (!RemoteTopicPartitionDirectory.openTopicPartitionDirectory(topicIdPartition, this.storageDirectory).delete()) {
                    throw new RemoteStorageException("Failed to delete remote log partition:" + topicIdPartition);
                }
                this.storageListeners.onStorageEvent(newEventBuilder.build());
                return null;
            } catch (Exception e) {
                this.storageListeners.onStorageEvent(newEventBuilder.withException(e).build());
                throw e;
            }
        });
    }

    public void close() {
        if (this.deleteOnClose) {
            clear();
        }
    }

    public void clear() {
        try {
            File[] listFiles = this.storageDirectory.listFiles();
            Optional findAny = Arrays.stream(listFiles).filter(file -> {
                return !file.isDirectory();
            }).findAny();
            if (findAny.isPresent()) {
                this.logger.warn("Found file [{}] which is not a remote topic-partition directory. Stopping the deletion process.", findAny.get());
                return;
            }
            if (((Boolean) Arrays.stream(listFiles).map(file2 -> {
                return RemoteTopicPartitionDirectory.openExistingTopicPartitionDirectory(file2.getName(), this.storageDirectory);
            }).map((v0) -> {
                return v0.delete();
            }).reduce(true, (v0, v1) -> {
                return Boolean.logicalAnd(v0, v1);
            })).booleanValue()) {
                this.storageDirectory.delete();
            }
            File file3 = new File(ROOT_STORAGE_DIR_NAME);
            if (file3.exists() && file3.isDirectory() && file3.list().length == 0) {
                file3.delete();
            }
        } catch (Exception e) {
            this.logger.error("Error while deleting remote storage. Stopping the deletion process.", e);
        }
    }

    public LocalTieredStorageHistory getHistory() {
        return this.history;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getStorageDirectoryRoot() throws RemoteStorageException {
        return (String) wrap(() -> {
            return this.storageDirectory.getAbsolutePath();
        });
    }

    private LocalTieredStorageEvent.Builder newEventBuilder(LocalTieredStorageEvent.EventType eventType, RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
        return LocalTieredStorageEvent.newBuilder(this.brokerId, eventType, this.eventTimestamp.incrementAndGet(), remoteLogSegmentMetadata.remoteLogSegmentId()).withMetadata(remoteLogSegmentMetadata);
    }

    private LocalTieredStorageEvent.Builder newEventBuilder(LocalTieredStorageEvent.EventType eventType, TopicIdPartition topicIdPartition) {
        return LocalTieredStorageEvent.newBuilder(this.brokerId, eventType, this.eventTimestamp.incrementAndGet(), new RemoteLogSegmentId(topicIdPartition, Uuid.ZERO_UUID));
    }

    private <U> U wrap(Callable<U> callable) throws RemoteStorageException {
        if (this.storageDirectory == null) {
            throw new RemoteStorageException("No storage directory was defined for the local remote storage. Make sure the instance was configured correctly via the configure() method.");
        }
        try {
            return callable.call();
        } catch (Exception e) {
            throw new RemoteStorageException("Internal error in local remote storage", e);
        } catch (RemoteStorageException e2) {
            throw e2;
        } catch (FileNotFoundException | NoSuchFileException e3) {
            throw new RemoteResourceNotFoundException(e3);
        }
    }

    private static void checkArgument(boolean z, String str, Object... objArr) {
        if (!z) {
            throw new IllegalArgumentException(str + ": " + Arrays.toString(objArr));
        }
    }

    private LocalTieredStorageEvent.EventType getEventTypeForFetch(RemoteStorageManager.IndexType indexType) {
        switch (AnonymousClass2.$SwitchMap$org$apache$kafka$server$log$remote$storage$RemoteStorageManager$IndexType[indexType.ordinal()]) {
            case 1:
                return LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
            case 2:
                return LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
            case 3:
                return LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT;
            case 4:
                return LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
            case 5:
                return LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT;
            default:
                return LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
        }
    }

    private RemoteLogSegmentFileset.RemoteLogSegmentFileType getLogSegmentFileType(RemoteStorageManager.IndexType indexType) {
        switch (AnonymousClass2.$SwitchMap$org$apache$kafka$server$log$remote$storage$RemoteStorageManager$IndexType[indexType.ordinal()]) {
            case 1:
                return RemoteLogSegmentFileset.RemoteLogSegmentFileType.OFFSET_INDEX;
            case 2:
                return RemoteLogSegmentFileset.RemoteLogSegmentFileType.TIME_INDEX;
            case 3:
                return RemoteLogSegmentFileset.RemoteLogSegmentFileType.PRODUCER_SNAPSHOT;
            case 4:
                return RemoteLogSegmentFileset.RemoteLogSegmentFileType.TRANSACTION_INDEX;
            case 5:
                return RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
            default:
                return RemoteLogSegmentFileset.RemoteLogSegmentFileType.SEGMENT;
        }
    }

    public int brokerId() {
        return this.brokerId;
    }
}
