package io.pravega.controller.task;

import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.controller.server.eventProcessor.requesthandlers.TaskExceptions;
import io.pravega.controller.store.index.HostIndex;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.util.RetryHelper;
import io.pravega.shared.controller.event.ControllerEvent;
import io.pravega.shared.controller.event.ControllerEventSerializer;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/task/EventHelper.class */
public class EventHelper implements AutoCloseable {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(EventHelper.class);
    private static final long COMPLETION_TIMEOUT_MILLIS = Duration.ofMinutes(2).toMillis();
    private final ScheduledExecutorService executor;
    private final ScheduledExecutorService eventExecutor;
    private final String hostId;
    private final HostIndex hostTaskIndex;
    private final CompletableFuture<Void> writerInitFuture = new CompletableFuture<>();
    private final AtomicReference<EventStreamWriter<ControllerEvent>> requestEventWriterRef = new AtomicReference<>();
    private final ControllerEventSerializer controllerEventSerializer = new ControllerEventSerializer();
    private final AtomicLong completionTimeoutMillis = new AtomicLong(COMPLETION_TIMEOUT_MILLIS);

    public EventHelper(EventStreamWriter<ControllerEvent> eventStreamWriter, ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService2, String str, HostIndex hostIndex) {
        this.executor = scheduledExecutorService;
        this.eventExecutor = scheduledExecutorService2;
        this.hostId = str;
        this.hostTaskIndex = hostIndex;
        this.requestEventWriterRef.set(eventStreamWriter);
        this.writerInitFuture.complete(null);
    }

    @VisibleForTesting
    public EventHelper(ScheduledExecutorService scheduledExecutorService, String str, HostIndex hostIndex) {
        this.executor = scheduledExecutorService;
        this.eventExecutor = scheduledExecutorService;
        this.hostId = str;
        this.hostTaskIndex = hostIndex;
        this.writerInitFuture.complete(null);
    }

    @VisibleForTesting
    public void setRequestEventWriter(EventStreamWriter<ControllerEvent> eventStreamWriter) {
        this.requestEventWriterRef.set(eventStreamWriter);
        this.writerInitFuture.complete(null);
    }

    @VisibleForTesting
    public void setCompletionTimeoutMillis(long j) {
        this.completionTimeoutMillis.set(j);
    }

    public <T> CompletableFuture<T> addIndexAndSubmitTask(ControllerEvent controllerEvent, Supplier<CompletableFuture<T>> supplier) {
        String uuid = UUID.randomUUID().toString();
        return (CompletableFuture<T>) addRequestToIndex(this.hostId, uuid, controllerEvent).thenCompose(r9 -> {
            return Futures.handleCompose((CompletableFuture) supplier.get(), (obj, th) -> {
                if (th == null || (Exceptions.unwrap(th) instanceof StoreException.StoreConnectionException) || (Exceptions.unwrap(th) instanceof StoreException.WriteConflictException)) {
                    return RetryHelper.withIndefiniteRetriesAsync(() -> {
                        return writeEvent(controllerEvent);
                    }, th -> {
                        log.warn("writing event failed with {}", th.getMessage());
                    }, this.executor).thenCompose(r6 -> {
                        return removeTaskFromIndex(this.hostId, uuid);
                    }).thenApply(r62 -> {
                        if (th != null) {
                            throw new CompletionException(th);
                        }
                        return obj;
                    });
                }
                throw new CompletionException(th);
            });
        });
    }

    public CompletableFuture<Void> checkDone(Supplier<CompletableFuture<Boolean>> supplier) {
        return checkDone(supplier, 100L);
    }

    public CompletableFuture<Void> checkDone(Supplier<CompletableFuture<Boolean>> supplier, long j) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        return RetryHelper.loopWithTimeout(() -> {
            return Boolean.valueOf(!atomicBoolean.get());
        }, () -> {
            CompletableFuture completableFuture = (CompletableFuture) supplier.get();
            atomicBoolean.getClass();
            return completableFuture.thenAccept((v1) -> {
                r1.set(v1);
            });
        }, j, 5000L, this.completionTimeoutMillis.get(), this.executor);
    }

    public CompletableFuture<Void> writeEvent(ControllerEvent controllerEvent) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.writerInitFuture.thenComposeAsync(r6 -> {
            return this.requestEventWriterRef.get().writeEvent(controllerEvent.getKey(), controllerEvent);
        }, (Executor) this.eventExecutor).whenComplete((BiConsumer<? super U, ? super Throwable>) (r7, th) -> {
            if (th == null) {
                log.info("event posted successfully");
                completableFuture.complete(null);
                return;
            }
            log.warn("exception while posting event {} {}", th.getClass().getName(), th.getMessage());
            if (th instanceof TaskExceptions.ProcessingDisabledException) {
                completableFuture.completeExceptionally(th);
            } else {
                completableFuture.completeExceptionally(new TaskExceptions.PostEventException("Failed to post event", th));
            }
        });
        return completableFuture;
    }

    public CompletableFuture<Void> addRequestToIndex(String str, String str2, ControllerEvent controllerEvent) {
        return this.hostTaskIndex.addEntity(str, str2, this.controllerEventSerializer.toByteBuffer(controllerEvent).array());
    }

    public CompletableFuture<Void> removeTaskFromIndex(String str, String str2) {
        return this.hostTaskIndex.removeEntity(str, str2, true);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (!this.writerInitFuture.isDone()) {
            this.writerInitFuture.cancel(true);
        }
        EventStreamWriter<ControllerEvent> eventStreamWriter = this.requestEventWriterRef.get();
        if (eventStreamWriter != null) {
            eventStreamWriter.close();
        }
    }
}
