package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.HashMap;
import java.util.regex.Pattern;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StateDirectory.class */
public class StateDirectory {
    private static final Pattern PATH_NAME = Pattern.compile("\\d+_\\d+");
    private static final Logger log = LoggerFactory.getLogger((Class<?>) StateDirectory.class);
    static final String LOCK_FILE_NAME = ".lock";
    private final Time time;
    private final String appId;
    private final File stateDir;
    private final boolean hasPersistentStores;
    private FileChannel globalStateChannel;
    private FileLock globalStateLock;
    private final Object taskDirCreationLock = new Object();
    private final HashMap<TaskId, FileChannel> channels = new HashMap<>();
    private final HashMap<TaskId, LockAndOwner> locks = new HashMap<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StateDirectory$LockAndOwner.class */
    public static class LockAndOwner {
        final FileLock lock;
        final String owningThread;

        LockAndOwner(String str, FileLock fileLock) {
            this.owningThread = str;
            this.lock = fileLock;
        }
    }

    public StateDirectory(StreamsConfig streamsConfig, Time time, boolean z) {
        this.time = time;
        this.hasPersistentStores = z;
        this.appId = streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG);
        String string = streamsConfig.getString(StreamsConfig.STATE_DIR_CONFIG);
        File file = new File(string);
        if (this.hasPersistentStores && !file.exists() && !file.mkdirs()) {
            throw new ProcessorStateException(String.format("base state directory [%s] doesn't exist and couldn't be created", string));
        }
        this.stateDir = new File(file, this.appId);
        if (this.hasPersistentStores && !this.stateDir.exists() && !this.stateDir.mkdir()) {
            throw new ProcessorStateException(String.format("state directory [%s] doesn't exist and couldn't be created", this.stateDir.getPath()));
        }
        if (z && string.startsWith("/tmp")) {
            log.warn("Using /tmp directory in the state.dir property can cause failures with writing the checkpoint file due to the fact that this directory can be cleared by the OS");
        }
    }

    public File directoryForTask(TaskId taskId) {
        File file = new File(this.stateDir, taskId.toString());
        if (this.hasPersistentStores && !file.exists()) {
            synchronized (this.taskDirCreationLock) {
                if (!file.exists() && !file.mkdir()) {
                    throw new ProcessorStateException(String.format("task directory [%s] doesn't exist and couldn't be created", file.getPath()));
                }
            }
        }
        return file;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public File checkpointFileFor(TaskId taskId) {
        return new File(directoryForTask(taskId), ".checkpoint");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean directoryForTaskIsEmpty(TaskId taskId) {
        return taskDirEmpty(directoryForTask(taskId));
    }

    private boolean taskDirEmpty(File file) {
        File[] listFiles = file.listFiles(file2 -> {
            return (file2.getName().equals(LOCK_FILE_NAME) || file2.getName().equals(".checkpoint")) ? false : true;
        });
        return listFiles == null || listFiles.length == 0;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public File globalStateDir() {
        File file = new File(this.stateDir, "global");
        if (!this.hasPersistentStores || file.exists() || file.mkdir()) {
            return file;
        }
        throw new ProcessorStateException(String.format("global state directory [%s] doesn't exist and couldn't be created", file.getPath()));
    }

    private String logPrefix() {
        return String.format("stream-thread [%s]", Thread.currentThread().getName());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized boolean lock(TaskId taskId) throws IOException {
        if (!this.hasPersistentStores) {
            return true;
        }
        LockAndOwner lockAndOwner = this.locks.get(taskId);
        if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
            log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId);
            return true;
        }
        if (lockAndOwner != null) {
            return false;
        }
        try {
            try {
                FileLock tryLock = tryLock(getOrCreateFileChannel(taskId, new File(directoryForTask(taskId), LOCK_FILE_NAME).toPath()));
                if (tryLock != null) {
                    this.locks.put(taskId, new LockAndOwner(Thread.currentThread().getName(), tryLock));
                    log.debug("{} Acquired state dir lock for task {}", logPrefix(), taskId);
                }
                return tryLock != null;
            } catch (NoSuchFileException e) {
                return false;
            }
        } catch (ProcessorStateException e2) {
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized boolean lockGlobalState() throws IOException {
        if (!this.hasPersistentStores) {
            return true;
        }
        if (this.globalStateLock != null) {
            log.trace("{} Found cached state dir lock for the global task", logPrefix());
            return true;
        }
        try {
            FileChannel open = FileChannel.open(new File(globalStateDir(), LOCK_FILE_NAME).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
            FileLock tryLock = tryLock(open);
            if (tryLock == null) {
                open.close();
                return false;
            }
            this.globalStateChannel = open;
            this.globalStateLock = tryLock;
            log.debug("{} Acquired global state dir lock", logPrefix());
            return true;
        } catch (NoSuchFileException e) {
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void unlockGlobalState() throws IOException {
        if (this.globalStateLock == null) {
            return;
        }
        this.globalStateLock.release();
        this.globalStateChannel.close();
        this.globalStateLock = null;
        this.globalStateChannel = null;
        log.debug("{} Released global state dir lock", logPrefix());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void unlock(TaskId taskId) throws IOException {
        LockAndOwner lockAndOwner = this.locks.get(taskId);
        if (lockAndOwner == null || !lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
            return;
        }
        this.locks.remove(taskId);
        lockAndOwner.lock.release();
        log.debug("{} Released state dir lock for task {}", logPrefix(), taskId);
        FileChannel remove = this.channels.remove(taskId);
        if (remove != null) {
            remove.close();
        }
    }

    public synchronized void clean() {
        try {
            cleanRemovedTasksCalledByUser();
            try {
                if (this.stateDir.exists()) {
                    Utils.delete(globalStateDir().getAbsoluteFile());
                }
            } catch (IOException e) {
                log.error(String.format("%s Failed to delete global state directory of %s due to an unexpected exception", logPrefix(), this.appId), (Throwable) e);
                throw new StreamsException(e);
            }
        } catch (Exception e2) {
            throw new StreamsException(e2);
        }
    }

    public synchronized void cleanRemovedTasks(long j) {
        try {
            cleanRemovedTasksCalledByCleanerThread(j);
        } catch (Exception e) {
            throw new IllegalStateException("Should have swallowed exception.", e);
        }
    }

    private void cleanRemovedTasksCalledByCleanerThread(long j) {
        for (File file : listNonEmptyTaskDirectories()) {
            String name = file.getName();
            TaskId parse = TaskId.parse(name);
            if (!this.locks.containsKey(parse)) {
                try {
                    try {
                        if (lock(parse)) {
                            long milliseconds = this.time.milliseconds();
                            long lastModified = file.lastModified();
                            if (milliseconds > lastModified + j) {
                                log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).", logPrefix(), name, parse, Long.valueOf(milliseconds - lastModified), Long.valueOf(j));
                                Utils.delete(file, Collections.singletonList(new File(file, LOCK_FILE_NAME)));
                            }
                        }
                        try {
                            unlock(parse);
                        } catch (IOException e) {
                            log.warn(String.format("%s Swallowed the following exception during unlocking after deletion of obsolete state directory %s for task %s:", logPrefix(), name, parse), (Throwable) e);
                        }
                    } catch (Throwable th) {
                        try {
                            unlock(parse);
                        } catch (IOException e2) {
                            log.warn(String.format("%s Swallowed the following exception during unlocking after deletion of obsolete state directory %s for task %s:", logPrefix(), name, parse), (Throwable) e2);
                        }
                        throw th;
                    }
                } catch (IOException | OverlappingFileLockException e3) {
                    log.warn(String.format("%s Swallowed the following exception during deletion of obsolete state directory %s for task %s:", logPrefix(), name, parse), e3);
                    try {
                        unlock(parse);
                    } catch (IOException e4) {
                        log.warn(String.format("%s Swallowed the following exception during unlocking after deletion of obsolete state directory %s for task %s:", logPrefix(), name, parse), (Throwable) e4);
                    }
                }
            }
        }
    }

    private void cleanRemovedTasksCalledByUser() throws Exception {
        for (File file : listAllTaskDirectories()) {
            String name = file.getName();
            TaskId parse = TaskId.parse(name);
            if (!this.locks.containsKey(parse)) {
                try {
                    try {
                        if (lock(parse)) {
                            log.info("{} Deleting state directory {} for task {} as user calling cleanup.", logPrefix(), name, parse);
                            Utils.delete(file, Collections.singletonList(new File(file, LOCK_FILE_NAME)));
                        } else {
                            log.warn("{} Could not get lock for state directory {} for task {} as user calling cleanup.", logPrefix(), name, parse);
                        }
                        try {
                            unlock(parse);
                            Utils.delete(file);
                        } catch (IOException e) {
                            log.error(String.format("%s Failed to release lock on state directory %s for task %s with exception:", logPrefix(), name, parse), (Throwable) e);
                            throw e;
                        }
                    } catch (IOException | OverlappingFileLockException e2) {
                        log.error(String.format("%s Failed to delete state directory %s for task %s with exception:", logPrefix(), name, parse), e2);
                        throw e2;
                    }
                } catch (Throwable th) {
                    try {
                        unlock(parse);
                        Utils.delete(file);
                        throw th;
                    } catch (IOException e3) {
                        log.error(String.format("%s Failed to release lock on state directory %s for task %s with exception:", logPrefix(), name, parse), (Throwable) e3);
                        throw e3;
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public File[] listNonEmptyTaskDirectories() {
        File[] listFiles = (this.hasPersistentStores && this.stateDir.exists()) ? this.stateDir.listFiles(file -> {
            return file.isDirectory() && PATH_NAME.matcher(file.getName()).matches() && !taskDirEmpty(file);
        }) : new File[0];
        return listFiles == null ? new File[0] : listFiles;
    }

    File[] listAllTaskDirectories() {
        File[] listFiles = (this.hasPersistentStores && this.stateDir.exists()) ? this.stateDir.listFiles(file -> {
            return file.isDirectory() && PATH_NAME.matcher(file.getName()).matches();
        }) : new File[0];
        return listFiles == null ? new File[0] : listFiles;
    }

    private FileChannel getOrCreateFileChannel(TaskId taskId, Path path) throws IOException {
        if (!this.channels.containsKey(taskId)) {
            this.channels.put(taskId, FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE));
        }
        return this.channels.get(taskId);
    }

    private FileLock tryLock(FileChannel fileChannel) throws IOException {
        try {
            return fileChannel.tryLock();
        } catch (OverlappingFileLockException e) {
            return null;
        }
    }
}
