package org.apache.bookkeeper.stream.storage.impl;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;

/* loaded from: input_file:org/apache/bookkeeper/stream/storage/impl/AsyncOperationProcessor.class */
public abstract class AsyncOperationProcessor<ReqT, RespT, StateT> {
    public CompletableFuture<RespT> process(StateT statet, ReqT reqt, ScheduledExecutorService scheduledExecutorService) {
        CompletableFuture<RespT> createFuture = FutureUtils.createFuture();
        scheduledExecutorService.submit(() -> {
            processRequest(createFuture, statet, reqt);
        });
        return createFuture;
    }

    protected abstract StatusCode verifyRequest(StateT statet, ReqT reqt);

    protected abstract RespT failRequest(StatusCode statusCode);

    protected abstract CompletableFuture<RespT> doProcessRequest(StateT statet, ReqT reqt);

    protected void processRequest(CompletableFuture<RespT> completableFuture, StateT statet, ReqT reqt) {
        try {
            StatusCode verifyRequest = verifyRequest(statet, reqt);
            if (StatusCode.SUCCESS != verifyRequest) {
                completableFuture.complete(failRequest(verifyRequest));
            } else {
                doProcessRequest(statet, reqt).whenComplete((obj, th) -> {
                    if (null == th) {
                        completableFuture.complete(obj);
                    } else {
                        completableFuture.completeExceptionally(th);
                    }
                });
            }
        } catch (Throwable th2) {
            completableFuture.completeExceptionally(th2);
        }
    }
}
