package io.pravega.segmentstore.storage;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.MultiKeySequentialProcessor;
import io.pravega.common.function.RunnableWithException;
import io.pravega.segmentstore.contracts.SegmentProperties;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
/* loaded from: input_file:io/pravega/segmentstore/storage/AsyncStorageWrapper.class */
public class AsyncStorageWrapper implements Storage {
    private final SyncStorage syncStorage;
    private final Executor executor;
    private final MultiKeySequentialProcessor<String> taskProcessor;
    private final AtomicBoolean closed = new AtomicBoolean();

    public AsyncStorageWrapper(SyncStorage syncStorage, Executor executor) {
        this.syncStorage = (SyncStorage) Preconditions.checkNotNull(syncStorage, "syncStorage");
        this.executor = (Executor) Preconditions.checkNotNull(executor, "executor");
        this.taskProcessor = new MultiKeySequentialProcessor<>(this.executor);
    }

    @Override // io.pravega.segmentstore.storage.Storage, io.pravega.segmentstore.storage.ReadOnlyStorage, java.lang.AutoCloseable
    public void close() {
        if (this.closed.getAndSet(true)) {
            return;
        }
        this.syncStorage.close();
    }

    @Override // io.pravega.segmentstore.storage.ReadOnlyStorage
    public void initialize(long j) {
        this.syncStorage.initialize(j);
    }

    @Override // io.pravega.segmentstore.storage.Storage
    public CompletableFuture<SegmentHandle> openWrite(String str) {
        return supplyAsync(() -> {
            return this.syncStorage.openWrite(str);
        }, str);
    }

    @Override // io.pravega.segmentstore.storage.Storage
    public CompletableFuture<SegmentHandle> create(String str, SegmentRollingPolicy segmentRollingPolicy, Duration duration) {
        return supplyAsync(() -> {
            return this.syncStorage.create(str, segmentRollingPolicy);
        }, str);
    }

    @Override // io.pravega.segmentstore.storage.Storage
    public CompletableFuture<Void> write(SegmentHandle segmentHandle, long j, InputStream inputStream, int i, Duration duration) {
        return runAsync(() -> {
            this.syncStorage.write(segmentHandle, j, inputStream, i);
        }, segmentHandle.getSegmentName());
    }

    @Override // io.pravega.segmentstore.storage.Storage
    public CompletableFuture<Void> seal(SegmentHandle segmentHandle, Duration duration) {
        return runAsync(() -> {
            this.syncStorage.seal(segmentHandle);
        }, segmentHandle.getSegmentName());
    }

    @Override // io.pravega.segmentstore.storage.Storage
    public CompletableFuture<Void> concat(SegmentHandle segmentHandle, long j, String str, Duration duration) {
        return runAsync(() -> {
            this.syncStorage.concat(segmentHandle, j, str);
        }, segmentHandle.getSegmentName(), str);
    }

    @Override // io.pravega.segmentstore.storage.Storage
    public CompletableFuture<Void> delete(SegmentHandle segmentHandle, Duration duration) {
        return runAsync(() -> {
            this.syncStorage.delete(segmentHandle);
        }, segmentHandle.getSegmentName());
    }

    @Override // io.pravega.segmentstore.storage.Storage
    public CompletableFuture<Void> truncate(SegmentHandle segmentHandle, long j, Duration duration) {
        return runAsync(() -> {
            this.syncStorage.truncate(segmentHandle, j);
        }, segmentHandle.getSegmentName());
    }

    @Override // io.pravega.segmentstore.storage.Storage
    public boolean supportsTruncation() {
        return this.syncStorage.supportsTruncation();
    }

    @Override // io.pravega.segmentstore.storage.Storage
    public boolean supportsAtomicWrites() {
        return false;
    }

    @Override // io.pravega.segmentstore.storage.ReadOnlyStorage
    public CompletableFuture<SegmentHandle> openRead(String str) {
        return supplyAsync(() -> {
            return this.syncStorage.openRead(str);
        }, str);
    }

    @Override // io.pravega.segmentstore.storage.ReadOnlyStorage
    public CompletableFuture<Integer> read(SegmentHandle segmentHandle, long j, byte[] bArr, int i, int i2, Duration duration) {
        return supplyAsync(() -> {
            return Integer.valueOf(this.syncStorage.read(segmentHandle, j, bArr, i, i2));
        }, segmentHandle.getSegmentName());
    }

    @Override // io.pravega.segmentstore.storage.ReadOnlyStorage
    public CompletableFuture<SegmentProperties> getStreamSegmentInfo(String str, Duration duration) {
        return supplyAsync(() -> {
            return this.syncStorage.getStreamSegmentInfo(str);
        }, str);
    }

    @Override // io.pravega.segmentstore.storage.ReadOnlyStorage
    public CompletableFuture<Boolean> exists(String str, Duration duration) {
        return supplyAsync(() -> {
            return Boolean.valueOf(this.syncStorage.exists(str));
        }, str);
    }

    @Override // io.pravega.segmentstore.storage.Storage
    public Iterator<SegmentProperties> listSegments() throws IOException {
        return this.syncStorage.listSegments();
    }

    @VisibleForTesting
    int getSegmentWithOngoingOperationsCount() {
        return this.taskProcessor.getCurrentTaskCount();
    }

    private <R> CompletableFuture<R> supplyAsync(Callable<R> callable, String... strArr) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        return this.taskProcessor.add(Arrays.asList(strArr), () -> {
            return execute(callable);
        });
    }

    private CompletableFuture<Void> runAsync(RunnableWithException runnableWithException, String... strArr) {
        return supplyAsync(() -> {
            runnableWithException.run();
            return null;
        }, strArr);
    }

    private <R> CompletableFuture<R> execute(Callable<R> callable) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return callable.call();
            } catch (Exception e) {
                throw new CompletionException(e);
            }
        }, this.executor);
    }
}
