package io.pravega.connectors.flink;

import io.pravega.client.ClientConfig;
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.stream.Checkpoint;
import io.pravega.client.stream.ReaderGroup;
import io.pravega.client.stream.ReaderGroupConfig;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/pravega/connectors/flink/ReaderCheckpointHook.class */
public class ReaderCheckpointHook implements MasterTriggerRestoreHook<Checkpoint> {
    private static final Logger log = LoggerFactory.getLogger(ReaderCheckpointHook.class);
    private static final String PRAVEGA_CHECKPOINT_NAME_PREFIX = "PVG-CHK-";
    private static final int DEFAULT_CHECKPOINT_THREAD_POOL_SIZE = 3;
    protected ReaderGroup readerGroup;
    protected ReaderGroupManager readerGroupManager;
    private final String hookUid;
    private final Time triggerTimeout;
    private final ReaderGroupConfig readerGroupConfig;

    @GuardedBy("scheduledExecutorLock")
    private ScheduledExecutorService scheduledExecutorService;
    private final Object scheduledExecutorLock = new Object();
    private final CheckpointSerializer checkpointSerializer = new CheckpointSerializer();

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReaderCheckpointHook(String str, String str2, String str3, Time time, ClientConfig clientConfig, ReaderGroupConfig readerGroupConfig) {
        this.hookUid = (String) Preconditions.checkNotNull(str);
        this.triggerTimeout = time;
        this.readerGroupConfig = readerGroupConfig;
        initializeReaderGroup(str2, str3, clientConfig);
    }

    protected void initializeReaderGroup(String str, String str2, ClientConfig clientConfig) {
        this.readerGroupManager = ReaderGroupManager.withScope(str2, clientConfig);
        this.readerGroupManager.createReaderGroup(str, this.readerGroupConfig);
        this.readerGroup = this.readerGroupManager.getReaderGroup(str);
    }

    public String getIdentifier() {
        return this.hookUid;
    }

    public CompletableFuture<Checkpoint> triggerCheckpoint(long j, long j2, Executor executor) throws Exception {
        ensureScheduledExecutorExists();
        CompletableFuture<Checkpoint> initiateCheckpoint = this.readerGroup.initiateCheckpoint(createCheckpointName(j), this.scheduledExecutorService);
        this.scheduledExecutorService.schedule(() -> {
            return Boolean.valueOf(initiateCheckpoint.cancel(false));
        }, this.triggerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        return initiateCheckpoint;
    }

    public void restoreCheckpoint(long j, Checkpoint checkpoint) throws Exception {
        if (checkpoint != null) {
            this.readerGroup.resetReaderGroup(ReaderGroupConfig.builder().maxOutstandingCheckpointRequest(this.readerGroupConfig.getMaxOutstandingCheckpointRequest()).groupRefreshTimeMillis(this.readerGroupConfig.getGroupRefreshTimeMillis()).disableAutomaticCheckpoints().startFromCheckpoint(checkpoint).build2());
        }
    }

    public void reset() {
        log.info("resetting the reader group to initial state using the RG config {}", this.readerGroupConfig);
        this.readerGroup.resetReaderGroup(this.readerGroupConfig);
    }

    public void close() {
        log.info("closing reader group Manager");
        this.readerGroupManager.close();
        log.info("closing the reader group");
        this.readerGroup.close();
        synchronized (this.scheduledExecutorLock) {
            if (this.scheduledExecutorService != null) {
                log.info("Closing Scheduled Executor for hook {}", this.hookUid);
                this.scheduledExecutorService.shutdownNow();
                this.scheduledExecutorService = null;
            }
        }
    }

    public SimpleVersionedSerializer<Checkpoint> createCheckpointDataSerializer() {
        return this.checkpointSerializer;
    }

    private void ensureScheduledExecutorExists() {
        synchronized (this.scheduledExecutorLock) {
            if (this.scheduledExecutorService == null) {
                log.info("Creating Scheduled Executor for hook {}", this.hookUid);
                this.scheduledExecutorService = createScheduledExecutorService();
            }
        }
    }

    protected ScheduledExecutorService createScheduledExecutorService() {
        return Executors.newScheduledThreadPool(3);
    }

    protected ScheduledExecutorService getScheduledExecutorService() {
        return this.scheduledExecutorService;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static long parseCheckpointId(String str) {
        Preconditions.checkArgument(str.startsWith(PRAVEGA_CHECKPOINT_NAME_PREFIX));
        try {
            return Long.parseLong(str.substring(PRAVEGA_CHECKPOINT_NAME_PREFIX.length()));
        } catch (IndexOutOfBoundsException | NumberFormatException e) {
            throw new IllegalArgumentException(e);
        }
    }

    static String createCheckpointName(long j) {
        return PRAVEGA_CHECKPOINT_NAME_PREFIX + j;
    }
}
