package io.pravega.segmentstore.storage.impl.hdfs;

import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.function.RunnableWithException;
import io.pravega.segmentstore.contracts.SegmentProperties;
import io.pravega.segmentstore.storage.SegmentHandle;
import io.pravega.segmentstore.storage.Storage;
import io.pravega.segmentstore.storage.impl.hdfs.FileSystemOperation;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/segmentstore/storage/impl/hdfs/HDFSStorage.class */
class HDFSStorage implements Storage {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(HDFSStorage.class);
    private final Executor executor;
    private final HDFSStorageConfig config;
    private final AtomicBoolean closed;
    private FileSystemOperation.OperationContext context;

    /* JADX INFO: Access modifiers changed from: package-private */
    public HDFSStorage(HDFSStorageConfig hDFSStorageConfig, Executor executor) {
        Preconditions.checkNotNull(hDFSStorageConfig, "config");
        Preconditions.checkNotNull(executor, "executor");
        this.config = hDFSStorageConfig;
        this.executor = executor;
        this.closed = new AtomicBoolean(false);
    }

    public void close() {
        if (this.closed.getAndSet(true) || this.context == null) {
            return;
        }
        try {
            this.context.fileSystem.close();
            this.context = null;
        } catch (IOException e) {
            log.warn("Could not close the HDFS filesystem: {}.", e);
        }
    }

    public void initialize(long j) {
        try {
            Exceptions.checkNotClosed(this.closed.get(), this);
            Preconditions.checkState(this.context == null, "HDFSStorage has already been initialized.");
            Preconditions.checkArgument(j > 0, "epoch must be a positive number. Given %s.", j);
            Configuration configuration = new Configuration();
            configuration.set("fs.default.name", this.config.getHdfsHostURL());
            configuration.set("fs.default.fs", this.config.getHdfsHostURL());
            configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
            configuration.set("fs.hdfs.impl.disable.cache", "true");
            if (!this.config.isReplaceDataNodesOnFailure()) {
                configuration.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
            }
            this.context = new FileSystemOperation.OperationContext(j, openFileSystem(configuration), this.config);
            log.info("Initialized (HDFSHost = '{}', Epoch = {}).", this.config.getHdfsHostURL(), Long.valueOf(j));
        } catch (IOException e) {
            throw e;
        }
    }

    protected FileSystem openFileSystem(Configuration configuration) throws IOException {
        return FileSystem.get(configuration);
    }

    public CompletableFuture<SegmentProperties> create(String str, Duration duration) {
        return supplyAsync(new CreateOperation(str, this.context));
    }

    public CompletableFuture<SegmentHandle> openWrite(String str) {
        return supplyAsync(new OpenWriteOperation(str, this.context));
    }

    public CompletableFuture<SegmentHandle> openRead(String str) {
        return supplyAsync(new OpenReadOperation(str, this.context));
    }

    public CompletableFuture<Void> write(SegmentHandle segmentHandle, long j, InputStream inputStream, int i, Duration duration) {
        return runAsync(new WriteOperation(asWritableHandle(segmentHandle), j, inputStream, i, this.context));
    }

    public CompletableFuture<Void> seal(SegmentHandle segmentHandle, Duration duration) {
        return runAsync(new SealOperation(asWritableHandle(segmentHandle), this.context));
    }

    public CompletableFuture<Void> concat(SegmentHandle segmentHandle, long j, String str, Duration duration) {
        return runAsync(new ConcatOperation(asWritableHandle(segmentHandle), j, str, this.context));
    }

    public CompletableFuture<Void> delete(SegmentHandle segmentHandle, Duration duration) {
        return runAsync(new DeleteOperation(asReadableHandle(segmentHandle), this.context));
    }

    public CompletableFuture<Integer> read(SegmentHandle segmentHandle, long j, byte[] bArr, int i, int i2, Duration duration) {
        return supplyAsync(new ReadOperation(asReadableHandle(segmentHandle), j, bArr, i, i2, this.context));
    }

    public CompletableFuture<SegmentProperties> getStreamSegmentInfo(String str, Duration duration) {
        return supplyAsync(new GetInfoOperation(str, this.context));
    }

    public CompletableFuture<Boolean> exists(String str, Duration duration) {
        return supplyAsync(new ExistsOperation(str, this.context));
    }

    private <T extends FileSystemOperation & RunnableWithException> CompletableFuture<Void> runAsync(T t) {
        ensureInitializedAndNotClosed();
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.executor.execute(() -> {
            try {
                ((RunnableWithException) t).run();
                completableFuture.complete(null);
            } catch (Throwable th) {
                handleException(th, t, completableFuture);
            }
        });
        return completableFuture;
    }

    private <R, T extends FileSystemOperation & Callable<? extends R>> CompletableFuture<R> supplyAsync(T t) {
        ensureInitializedAndNotClosed();
        CompletableFuture<R> completableFuture = new CompletableFuture<>();
        this.executor.execute(() -> {
            try {
                completableFuture.complete(((Callable) t).call());
            } catch (Throwable th) {
                handleException(th, t, completableFuture);
            }
        });
        return completableFuture;
    }

    private void handleException(Throwable th, FileSystemOperation<?> fileSystemOperation, CompletableFuture<?> completableFuture) {
        completableFuture.completeExceptionally(HDFSExceptionHelpers.translateFromException(fileSystemOperation.getTarget() instanceof SegmentHandle ? ((SegmentHandle) fileSystemOperation.getTarget()).getSegmentName() : fileSystemOperation.getTarget().toString(), th));
    }

    private HDFSSegmentHandle asWritableHandle(SegmentHandle segmentHandle) {
        Preconditions.checkArgument(!segmentHandle.isReadOnly(), "handle must not be read-only.");
        return asReadableHandle(segmentHandle);
    }

    private HDFSSegmentHandle asReadableHandle(SegmentHandle segmentHandle) {
        Preconditions.checkArgument(segmentHandle instanceof HDFSSegmentHandle, "handle must be of type HDFSSegmentHandle.");
        return (HDFSSegmentHandle) segmentHandle;
    }

    private void ensureInitializedAndNotClosed() {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkState(this.context != null, "HDFSStorage is not initialized.");
    }
}
