package io.pravega.connectors.flink;

import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.stream.Checkpoint;
import io.pravega.client.stream.EventRead;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.ReaderGroup;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.ReaderGroupNotFoundException;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamCut;
import io.pravega.client.stream.TruncatedDataException;
import io.pravega.connectors.flink.serialization.DeserializerFromSchemaRegistry;
import io.pravega.connectors.flink.serialization.PravegaDeserializationSchema;
import io.pravega.connectors.flink.util.FlinkPravegaUtils;
import io.pravega.connectors.flink.watermark.AssignerWithTimeWindows;
import io.pravega.shared.NameUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaReader.class */
public class FlinkPravegaReader<T> extends RichParallelSourceFunction<T> implements ResultTypeQueryable<T>, ExternallyInducedSource<T, Checkpoint> {
    private static final Logger log;
    protected static final String PRAVEGA_READER_METRICS_GROUP = "PravegaReader";
    protected static final String READER_GROUP_METRICS_GROUP = "readerGroup";
    protected static final String STREAM_METRICS_GROUP = "stream";
    protected static final String UNREAD_BYTES_METRICS_GAUGE = "unreadBytes";
    protected static final String READER_GROUP_NAME_METRICS_GAUGE = "readerGroupName";
    protected static final String SCOPE_NAME_METRICS_GAUGE = "scope";
    protected static final String ONLINE_READERS_METRICS_GAUGE = "onlineReaders";
    protected static final String STREAM_NAMES_METRICS_GAUGE = "streams";
    protected static final String SEGMENT_POSITIONS_METRICS_GAUGE = "segmentPositions";
    protected static final String SEPARATOR = ",";
    private static final long serialVersionUID = 1;
    protected transient EventStreamClientFactory eventStreamClientFactory;
    final String hookUid;
    final ClientConfig clientConfig;
    final ReaderGroupConfig readerGroupConfig;
    final String readerGroupScope;
    final String readerGroupName;
    final DeserializationSchema<T> deserializationSchema;
    final SerializedValue<AssignerWithTimeWindows<T>> assignerWithTimeWindows;
    final Time eventReadTimeout;
    final Time checkpointInitiateTimeout;
    final boolean enableMetrics;
    private transient ExternallyInducedSource.CheckpointTrigger checkpointTrigger;
    private final PravegaCollector<T> pravegaCollector;
    static final /* synthetic */ boolean $assertionsDisabled;
    protected transient ReaderGroupManager readerGroupManager = null;
    volatile boolean running = true;
    private transient ReaderGroup readerGroup = null;

    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaReader$Builder.class */
    public static class Builder<T> extends AbstractStreamingReaderBuilder<T, Builder<T>> {
        private DeserializationSchema<T> deserializationSchema;
        private SerializedValue<AssignerWithTimeWindows<T>> assignerWithTimeWindows;

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.pravega.connectors.flink.AbstractReaderBuilder
        public Builder<T> builder() {
            return this;
        }

        public Builder<T> withDeserializationSchema(DeserializationSchema<T> deserializationSchema) {
            this.deserializationSchema = deserializationSchema;
            return builder();
        }

        public Builder<T> withDeserializationSchemaFromRegistry(String str, Class<T> cls) {
            this.deserializationSchema = new PravegaDeserializationSchema(cls, new DeserializerFromSchemaRegistry(getPravegaConfig(), str, cls));
            return builder();
        }

        public Builder<T> withTimestampAssigner(AssignerWithTimeWindows<T> assignerWithTimeWindows) {
            try {
                ClosureCleaner.clean(assignerWithTimeWindows, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
                this.assignerWithTimeWindows = new SerializedValue<>(assignerWithTimeWindows);
                return this;
            } catch (IOException e) {
                throw new IllegalArgumentException("The given assigner is not serializable", e);
            }
        }

        @Override // io.pravega.connectors.flink.AbstractStreamingReaderBuilder
        protected DeserializationSchema<T> getDeserializationSchema() {
            Preconditions.checkState(this.deserializationSchema != null, "Deserialization schema must not be null.");
            return this.deserializationSchema;
        }

        @Override // io.pravega.connectors.flink.AbstractStreamingReaderBuilder
        protected SerializedValue<AssignerWithTimeWindows<T>> getAssignerWithTimeWindows() {
            return this.assignerWithTimeWindows;
        }

        public FlinkPravegaReader<T> build() {
            return buildSourceFunction();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaReader$OnlineReadersGauge.class */
    public static class OnlineReadersGauge implements Gauge<String> {
        private final ReaderGroup readerGroup;

        public OnlineReadersGauge(ReaderGroup readerGroup) {
            this.readerGroup = readerGroup;
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public String m121getValue() {
            return (String) this.readerGroup.getOnlineReaders().stream().collect(Collectors.joining(FlinkPravegaReader.SEPARATOR));
        }
    }

    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaReader$PeriodicWatermarkEmitter.class */
    private class PeriodicWatermarkEmitter implements ProcessingTimeCallback {
        private EventStreamReader<?> pravegaReader;
        private Stream stream;
        private final SourceFunction.SourceContext<?> ctx;
        private final ProcessingTimeService timerService;
        private long lastWatermarkTimestamp = Long.MIN_VALUE;
        private AssignerWithTimeWindows<?> userAssigner;

        protected PeriodicWatermarkEmitter(EventStreamReader<?> eventStreamReader, SourceFunction.SourceContext<?> sourceContext, ClassLoader classLoader, ProcessingTimeService processingTimeService) throws Exception {
            this.pravegaReader = (EventStreamReader) Preconditions.checkNotNull(eventStreamReader);
            this.stream = Stream.of(FlinkPravegaReader.this.readerGroup.getStreamNames().iterator().next());
            this.ctx = (SourceFunction.SourceContext) Preconditions.checkNotNull(sourceContext);
            this.timerService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
            this.userAssigner = (AssignerWithTimeWindows) FlinkPravegaReader.this.assignerWithTimeWindows.deserializeValue(classLoader);
        }

        protected void start() {
            this.timerService.registerTimer(this.timerService.getCurrentProcessingTime() + FlinkPravegaReader.this.autoWatermarkInterval(), this);
        }

        public void onProcessingTime(long j) {
            Watermark watermark = this.userAssigner.getWatermark(this.pravegaReader.getCurrentTimeWindow(this.stream));
            if (watermark != null && watermark.getTimestamp() > this.lastWatermarkTimestamp) {
                this.lastWatermarkTimestamp = watermark.getTimestamp();
                FlinkPravegaReader.log.debug("Emit watermark with timestamp: {}", Long.valueOf(watermark.getTimestamp()));
                this.ctx.emitWatermark(watermark);
            }
            this.timerService.registerTimer(this.timerService.getCurrentProcessingTime() + FlinkPravegaReader.this.autoWatermarkInterval(), this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaReader$ReaderGroupNameGauge.class */
    public static class ReaderGroupNameGauge implements Gauge<String> {
        private final ReaderGroup readerGroup;

        public ReaderGroupNameGauge(ReaderGroup readerGroup) {
            this.readerGroup = readerGroup;
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public String m122getValue() {
            return this.readerGroup.getGroupName();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaReader$ScopeNameGauge.class */
    public static class ScopeNameGauge implements Gauge<String> {
        private final ReaderGroup readerGroup;

        public ScopeNameGauge(ReaderGroup readerGroup) {
            this.readerGroup = readerGroup;
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public String m123getValue() {
            return this.readerGroup.getScope();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaReader$SegmentPositionsGauge.class */
    public static class SegmentPositionsGauge implements Gauge<String> {
        private final ReaderGroup readerGroup;
        private final String scope;
        private final String stream;

        public SegmentPositionsGauge(ReaderGroup readerGroup, String str, String str2) {
            this.readerGroup = readerGroup;
            this.scope = str;
            this.stream = str2;
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public String m124getValue() {
            StringBuilder sb = new StringBuilder();
            sb.append("scope=").append(this.scope).append(", ");
            sb.append("stream=").append(this.stream).append(", segments={");
            this.readerGroup.getStreamCuts().entrySet().stream().filter(entry -> {
                return ((Stream) entry.getKey()).getStreamName().equals(this.stream) && ((Stream) entry.getKey()).getScope().equals(this.scope);
            }).findFirst().ifPresent(entry2 -> {
                sb.append(((StreamCut) entry2.getValue()).toString());
            });
            sb.append("}");
            return sb.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaReader$StreamNamesGauge.class */
    public static class StreamNamesGauge implements Gauge<String> {
        private final ReaderGroup readerGroup;

        public StreamNamesGauge(ReaderGroup readerGroup) {
            this.readerGroup = readerGroup;
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public String m125getValue() {
            return (String) this.readerGroup.getStreamNames().stream().collect(Collectors.joining(FlinkPravegaReader.SEPARATOR));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaReader$UnreadBytesGauge.class */
    public static class UnreadBytesGauge implements Gauge<Long> {
        private final ReaderGroup readerGroup;

        public UnreadBytesGauge(ReaderGroup readerGroup) {
            this.readerGroup = readerGroup;
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public Long m126getValue() {
            return Long.valueOf(this.readerGroup.getMetrics().unreadBytes());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FlinkPravegaReader(String str, ClientConfig clientConfig, ReaderGroupConfig readerGroupConfig, String str2, String str3, DeserializationSchema<T> deserializationSchema, SerializedValue<AssignerWithTimeWindows<T>> serializedValue, Time time, Time time2, boolean z) {
        this.hookUid = (String) Preconditions.checkNotNull(str, "hookUid");
        this.clientConfig = (ClientConfig) Preconditions.checkNotNull(clientConfig, "clientConfig");
        this.readerGroupConfig = (ReaderGroupConfig) Preconditions.checkNotNull(readerGroupConfig, "readerGroupConfig");
        this.readerGroupScope = (String) Preconditions.checkNotNull(str2, "readerGroupScope");
        this.readerGroupName = (String) Preconditions.checkNotNull(str3, READER_GROUP_NAME_METRICS_GAUGE);
        this.deserializationSchema = (DeserializationSchema) Preconditions.checkNotNull(deserializationSchema, "deserializationSchema");
        this.eventReadTimeout = (Time) Preconditions.checkNotNull(time, "eventReadTimeout");
        this.checkpointInitiateTimeout = (Time) Preconditions.checkNotNull(time2, "checkpointInitiateTimeout");
        this.enableMetrics = z;
        this.assignerWithTimeWindows = serializedValue;
        this.pravegaCollector = new PravegaCollector<>(deserializationSchema);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initialize() {
        createEventStreamClientFactory();
        log.info("Creating reader group: {}/{} for the Flink job", this.readerGroupScope, this.readerGroupName);
        createReaderGroupManager();
        createReaderGroup();
        if (isEventTimeMode()) {
            Preconditions.checkArgument(this.readerGroup.getStreamNames().size() == 1, "Only 1 Pravega stream is allowed in the event-time mode");
        }
    }

    private boolean isEventTimeMode() {
        return this.assignerWithTimeWindows != null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long autoWatermarkInterval() {
        return getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
    }

    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        StreamingRuntimeContext runtimeContext = getRuntimeContext();
        String readerName = FlinkPravegaUtils.getReaderName(runtimeContext.getTaskName(), runtimeContext.getIndexOfThisSubtask() + 1, runtimeContext.getNumberOfParallelSubtasks());
        log.info("{} : Creating Pravega reader with ID '{}' for controller URI: {}", new Object[]{runtimeContext.getTaskNameWithSubtasks(), readerName, this.clientConfig.getControllerURI()});
        try {
            EventStreamReader<ByteBuffer> createEventStreamReader = createEventStreamReader(readerName);
            try {
                log.info("Starting Pravega reader '{}' for controller URI {}", readerName, this.clientConfig.getControllerURI());
                AssignerWithTimeWindows<T> assignerWithTimeWindows = null;
                if (isEventTimeMode()) {
                    assignerWithTimeWindows = (AssignerWithTimeWindows) this.assignerWithTimeWindows.deserializeValue(runtimeContext.getUserCodeClassLoader());
                    PeriodicWatermarkEmitter periodicWatermarkEmitter = new PeriodicWatermarkEmitter(createEventStreamReader, sourceContext, runtimeContext.getUserCodeClassLoader(), runtimeContext.getProcessingTimeService());
                    log.info("Periodic Watermark Emitter for Reader ID: {} has started with an interval of {}", readerName, Long.valueOf(autoWatermarkInterval()));
                    periodicWatermarkEmitter.start();
                }
                while (this.running) {
                    try {
                        EventRead<ByteBuffer> readNextEvent = createEventStreamReader.readNextEvent(this.eventReadTimeout.toMilliseconds());
                        if (readNextEvent.getEvent() != null) {
                            emitEvent(readNextEvent, sourceContext, Long.MIN_VALUE, assignerWithTimeWindows);
                            if (this.pravegaCollector.isEndOfStreamSignalled()) {
                                log.info("Reached end of stream for reader: {}", readerName);
                                if (createEventStreamReader != null) {
                                    createEventStreamReader.close();
                                    return;
                                }
                                return;
                            }
                        } else if (readNextEvent.isCheckpoint()) {
                            triggerCheckpoint(readNextEvent.getCheckpointName());
                        }
                    } catch (TruncatedDataException e) {
                    }
                }
                if (createEventStreamReader != null) {
                    createEventStreamReader.close();
                }
            } finally {
            }
        } catch (RuntimeException e2) {
            log.error("Exception occurred while creating a Pravega EventStreamReader to read events", e2);
            throw e2;
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:18:0x0070, code lost:
    
        throw new java.lang.AssertionError();
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void emitEvent(io.pravega.client.stream.EventRead<java.nio.ByteBuffer> r6, org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<T> r7, long r8, @javax.annotation.Nullable io.pravega.connectors.flink.watermark.AssignerWithTimeWindows<T> r10) throws java.io.IOException {
        /*
            r5 = this;
            r0 = r6
            java.lang.Object r0 = r0.getEvent()
            java.nio.ByteBuffer r0 = (java.nio.ByteBuffer) r0
            byte[] r0 = io.pravega.connectors.flink.util.FlinkPravegaUtils.byteBufferToArray(r0)
            r11 = r0
            r0 = r5
            org.apache.flink.api.common.serialization.DeserializationSchema<T> r0 = r0.deserializationSchema
            boolean r0 = r0 instanceof io.pravega.connectors.flink.serialization.PravegaDeserializationSchemaWithMetadata
            if (r0 == 0) goto L2c
            r0 = r5
            org.apache.flink.api.common.serialization.DeserializationSchema<T> r0 = r0.deserializationSchema
            io.pravega.connectors.flink.serialization.PravegaDeserializationSchemaWithMetadata r0 = (io.pravega.connectors.flink.serialization.PravegaDeserializationSchemaWithMetadata) r0
            r1 = r11
            r2 = r6
            r3 = r5
            io.pravega.connectors.flink.PravegaCollector<T> r3 = r3.pravegaCollector
            r0.deserialize(r1, r2, r3)
            goto L3b
        L2c:
            r0 = r5
            org.apache.flink.api.common.serialization.DeserializationSchema<T> r0 = r0.deserializationSchema
            r1 = r11
            r2 = r5
            io.pravega.connectors.flink.PravegaCollector<T> r2 = r2.pravegaCollector
            r0.deserialize(r1, r2)
        L3b:
            r0 = r5
            io.pravega.connectors.flink.PravegaCollector<T> r0 = r0.pravegaCollector
            java.util.Queue r0 = r0.getRecords()
            java.lang.Object r0 = r0.poll()
            r1 = r0
            r12 = r1
            if (r0 == 0) goto La6
            r0 = r7
            java.lang.Object r0 = r0.getCheckpointLock()
            r1 = r0
            r13 = r1
            monitor-enter(r0)
            r0 = r5
            boolean r0 = r0.isEventTimeMode()     // Catch: java.lang.Throwable -> L9b
            if (r0 == 0) goto L8d
            boolean r0 = io.pravega.connectors.flink.FlinkPravegaReader.$assertionsDisabled     // Catch: java.lang.Throwable -> L9b
            if (r0 != 0) goto L71
            r0 = r10
            if (r0 != 0) goto L71
            java.lang.AssertionError r0 = new java.lang.AssertionError     // Catch: java.lang.Throwable -> L9b
            r1 = r0
            r1.<init>()     // Catch: java.lang.Throwable -> L9b
            throw r0     // Catch: java.lang.Throwable -> L9b
        L71:
            r0 = r10
            r1 = r12
            r2 = r8
            long r0 = r0.extractTimestamp(r1, r2)     // Catch: java.lang.Throwable -> L9b
            r14 = r0
            r0 = r7
            r1 = r12
            r2 = r14
            r0.collectWithTimestamp(r1, r2)     // Catch: java.lang.Throwable -> L9b
            r0 = r14
            r8 = r0
            goto L95
        L8d:
            r0 = r7
            r1 = r12
            r0.collect(r1)     // Catch: java.lang.Throwable -> L9b
        L95:
            r0 = r13
            monitor-exit(r0)     // Catch: java.lang.Throwable -> L9b
            goto La3
        L9b:
            r16 = move-exception
            r0 = r13
            monitor-exit(r0)     // Catch: java.lang.Throwable -> L9b
            r0 = r16
            throw r0
        La3:
            goto L3b
        La6:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: io.pravega.connectors.flink.FlinkPravegaReader.emitEvent(io.pravega.client.stream.EventRead, org.apache.flink.streaming.api.functions.source.SourceFunction$SourceContext, long, io.pravega.connectors.flink.watermark.AssignerWithTimeWindows):void");
    }

    public void cancel() {
        this.running = false;
    }

    public TypeInformation<T> getProducedType() {
        return this.deserializationSchema.getProducedType();
    }

    public void open(Configuration configuration) throws Exception {
        this.deserializationSchema.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(getRuntimeContext(), metricGroup -> {
            return metricGroup.addGroup("user");
        }));
        createEventStreamClientFactory();
        createReaderGroupManager();
        createReaderGroup();
        if (this.enableMetrics) {
            registerMetrics();
        }
        if (isEventTimeMode()) {
            Preconditions.checkArgument(autoWatermarkInterval() > 0, "Periodic watermark interval should be positive, please use env.getConfig().setAutoWatermarkInterval() to set a positive number. Recommended value: 10000");
        }
    }

    public void close() throws Exception {
        Throwable th = null;
        if (this.eventStreamClientFactory != null) {
            try {
                log.info("Closing Pravega eventStreamClientFactory");
                this.eventStreamClientFactory.close();
            } catch (Throwable th2) {
                if (th2 instanceof InterruptedException) {
                    log.warn("Interrupted while waiting for eventStreamClientFactory to close, retrying ...");
                    this.eventStreamClientFactory.close();
                } else {
                    th = ExceptionUtils.firstOrSuppressed(th2, (Throwable) null);
                }
            }
        }
        if (this.readerGroupManager != null) {
            log.info("Closing Pravega ReaderGroupManager");
            try {
                this.readerGroupManager.close();
            } catch (Throwable th3) {
                if (th3 instanceof InterruptedException) {
                    log.warn("Interrupted while waiting for ReaderGroupManager to close, retrying ...");
                    this.readerGroupManager.close();
                } else {
                    th = ExceptionUtils.firstOrSuppressed(th3, th);
                }
            }
        }
        if (this.readerGroup != null) {
            try {
                log.info("Closing Pravega ReaderGroup");
                this.readerGroup.close();
            } catch (Throwable th4) {
                if (th4 instanceof InterruptedException) {
                    log.warn("Interrupted while waiting for ReaderGroup to close, retrying ...");
                    this.readerGroup.close();
                } else {
                    th = ExceptionUtils.firstOrSuppressed(th4, th);
                }
            }
        }
        if (th != null && (th instanceof Exception)) {
            throw ((Exception) th);
        }
    }

    public MasterTriggerRestoreHook<Checkpoint> createMasterTriggerRestoreHook() {
        return new ReaderCheckpointHook(this.hookUid, this.readerGroupName, this.readerGroupScope, this.checkpointInitiateTimeout, this.clientConfig, this.readerGroupConfig);
    }

    public void setCheckpointTrigger(ExternallyInducedSource.CheckpointTrigger checkpointTrigger) {
        this.checkpointTrigger = checkpointTrigger;
    }

    private void triggerCheckpoint(String str) throws FlinkException {
        Preconditions.checkState(this.checkpointTrigger != null, "checkpoint trigger not set");
        log.debug("{} received checkpoint event for {}", getRuntimeContext().getTaskNameWithSubtasks(), str);
        try {
            this.checkpointTrigger.triggerCheckpoint(ReaderCheckpointHook.parseCheckpointId(str));
        } catch (IllegalArgumentException e) {
            throw new FlinkException("Cannot trigger checkpoint due to invalid Pravega checkpoint name", e.getCause());
        }
    }

    private void registerMetrics() {
        Preconditions.checkState(this.readerGroup != null, "Reader Group is not created");
        MetricGroup addGroup = getRuntimeContext().getMetricGroup().addGroup(PRAVEGA_READER_METRICS_GROUP).addGroup(READER_GROUP_METRICS_GROUP);
        addGroup.gauge(UNREAD_BYTES_METRICS_GAUGE, new UnreadBytesGauge(this.readerGroup));
        addGroup.gauge(READER_GROUP_NAME_METRICS_GAUGE, new ReaderGroupNameGauge(this.readerGroup));
        addGroup.gauge("scope", new ScopeNameGauge(this.readerGroup));
        addGroup.gauge(ONLINE_READERS_METRICS_GAUGE, new OnlineReadersGauge(this.readerGroup));
        addGroup.gauge(STREAM_NAMES_METRICS_GAUGE, new StreamNamesGauge(this.readerGroup));
        Iterator<String> it = this.readerGroup.getStreamNames().iterator();
        while (it.hasNext()) {
            String[] split = it.next().split("/", 2);
            Preconditions.checkArgument(split.length == 2, "not a fully qualified stream expected: scopeName/streamName");
            addGroup.addGroup("stream." + split[0] + NameUtils.INTERNAL_NAME_PREFIX + split[1]).gauge(SEGMENT_POSITIONS_METRICS_GAUGE, new SegmentPositionsGauge(this.readerGroup, split[0], split[1]));
        }
    }

    private ReaderGroup createReaderGroup() {
        try {
            this.readerGroup = this.readerGroupManager.getReaderGroup(this.readerGroupName);
        } catch (ReaderGroupNotFoundException e) {
            this.readerGroupManager.createReaderGroup(this.readerGroupName, this.readerGroupConfig);
            this.readerGroup = this.readerGroupManager.getReaderGroup(this.readerGroupName);
        }
        return this.readerGroup;
    }

    protected ReaderGroupManager createReaderGroupManager() {
        if (this.readerGroupManager == null) {
            this.readerGroupManager = ReaderGroupManager.withScope(this.readerGroupScope, this.clientConfig);
        }
        return this.readerGroupManager;
    }

    protected EventStreamClientFactory createEventStreamClientFactory() {
        if (this.eventStreamClientFactory == null) {
            this.eventStreamClientFactory = EventStreamClientFactory.withScope(this.readerGroupScope, this.clientConfig);
        }
        return this.eventStreamClientFactory;
    }

    protected EventStreamReader<ByteBuffer> createEventStreamReader(String str) {
        return FlinkPravegaUtils.createPravegaReader(str, this.readerGroupName, ReaderConfig.builder().build(), this.eventStreamClientFactory);
    }

    public static <T> Builder<T> builder() {
        return new Builder<>();
    }

    static {
        $assertionsDisabled = !FlinkPravegaReader.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(FlinkPravegaReader.class);
    }
}
