package io.pravega.client.stream.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.SynchronizerClientFactory;
import io.pravega.client.connection.impl.ConnectionPool;
import io.pravega.client.control.impl.Controller;
import io.pravega.client.security.auth.DelegationTokenProvider;
import io.pravega.client.security.auth.DelegationTokenProviderFactory;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.segment.impl.SegmentMetadataClient;
import io.pravega.client.segment.impl.SegmentMetadataClientFactory;
import io.pravega.client.segment.impl.SegmentMetadataClientFactoryImpl;
import io.pravega.client.state.InitialUpdate;
import io.pravega.client.state.StateSynchronizer;
import io.pravega.client.state.SynchronizerConfig;
import io.pravega.client.state.Update;
import io.pravega.client.stream.Checkpoint;
import io.pravega.client.stream.InvalidStreamException;
import io.pravega.client.stream.Position;
import io.pravega.client.stream.ReaderGroup;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.ReaderGroupMetrics;
import io.pravega.client.stream.ReaderSegmentDistribution;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamCut;
import io.pravega.client.stream.impl.ReaderGroupState;
import io.pravega.client.stream.notifications.EndOfDataNotification;
import io.pravega.client.stream.notifications.NotificationSystem;
import io.pravega.client.stream.notifications.NotifierFactory;
import io.pravega.client.stream.notifications.Observable;
import io.pravega.client.stream.notifications.SegmentNotification;
import io.pravega.common.concurrent.Futures;
import io.pravega.shaded.com.google.common.annotations.VisibleForTesting;
import io.pravega.shaded.com.google.common.base.Preconditions;
import io.pravega.shaded.com.google.common.collect.ImmutableMap;
import io.pravega.shared.NameUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/stream/impl/ReaderGroupImpl.class */
public class ReaderGroupImpl implements ReaderGroup, ReaderGroupMetrics {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ReaderGroupImpl.class);
    static final String SILENT = "_SILENT_";
    private final String scope;
    private final String groupName;
    private final Controller controller;
    private final SegmentMetadataClientFactory metaFactory;
    private final StateSynchronizer<ReaderGroupState> synchronizer;
    private final NotifierFactory notifierFactory;

    public ReaderGroupImpl(String str, String str2, SynchronizerConfig synchronizerConfig, Serializer<InitialUpdate<ReaderGroupState>> serializer, Serializer<Update<ReaderGroupState>> serializer2, SynchronizerClientFactory synchronizerClientFactory, Controller controller, ConnectionPool connectionPool) {
        Preconditions.checkNotNull(synchronizerConfig);
        Preconditions.checkNotNull(serializer);
        Preconditions.checkNotNull(serializer2);
        Preconditions.checkNotNull(synchronizerClientFactory);
        Preconditions.checkNotNull(connectionPool);
        this.scope = (String) Preconditions.checkNotNull(str);
        this.groupName = (String) Preconditions.checkNotNull(str2);
        this.controller = (Controller) Preconditions.checkNotNull(controller);
        this.metaFactory = new SegmentMetadataClientFactoryImpl(controller, connectionPool);
        this.synchronizer = synchronizerClientFactory.createStateSynchronizer(NameUtils.getStreamForReaderGroup(str2), serializer2, serializer, synchronizerConfig);
        this.notifierFactory = new NotifierFactory(new NotificationSystem(), this.synchronizer);
    }

    @Override // io.pravega.client.stream.ReaderGroup
    public void readerOffline(String str, Position position) {
        ReaderGroupStateManager.readerShutdown(str, position, this.synchronizer);
    }

    @Override // io.pravega.client.stream.ReaderGroup
    public Set<String> getOnlineReaders() {
        this.synchronizer.fetchUpdates();
        return this.synchronizer.getState().getOnlineReaders();
    }

    @Override // io.pravega.client.stream.ReaderGroup
    public Set<String> getStreamNames() {
        this.synchronizer.fetchUpdates();
        return this.synchronizer.getState().getStreamNames();
    }

    @Override // io.pravega.client.stream.ReaderGroup
    public CompletableFuture<Checkpoint> initiateCheckpoint(String str, ScheduledExecutorService scheduledExecutorService) {
        String str2 = "rejecting checkpoint request since pending checkpoint reaches max allowed limit";
        return !((Boolean) this.synchronizer.updateState((readerGroupState, list) -> {
            ReaderGroupConfig config = readerGroupState.getConfig();
            CheckpointState checkpointState = readerGroupState.getCheckpointState();
            int maxOutstandingCheckpointRequest = config.getMaxOutstandingCheckpointRequest();
            List<String> outstandingCheckpoints = checkpointState.getOutstandingCheckpoints();
            int size = outstandingCheckpoints.size();
            if (size >= maxOutstandingCheckpointRequest) {
                log.warn("Current outstanding checkpoints are : {}, maxOutstandingCheckpointRequest: {}, currentOutstandingCheckpointRequest: {}, errorMessage: {} {}", new Object[]{outstandingCheckpoints, Integer.valueOf(maxOutstandingCheckpointRequest), Integer.valueOf(size), str2, Integer.valueOf(maxOutstandingCheckpointRequest)});
                return false;
            }
            list.add(new ReaderGroupState.CreateCheckpoint(str));
            return true;
        })).booleanValue() ? Futures.failedFuture(new MaxNumberOfCheckpointsExceededException("rejecting checkpoint request since pending checkpoint reaches max allowed limit")) : waitForCheckpointComplete(str, scheduledExecutorService).thenApply(r5 -> {
            return completeCheckpoint(str);
        }).thenApply((Function<? super U, ? extends U>) checkpoint -> {
            return checkpoint;
        });
    }

    private CompletableFuture<Void> waitForCheckpointComplete(String str, ScheduledExecutorService scheduledExecutorService) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        atomicBoolean.getClass();
        return Futures.loop((Supplier<Boolean>) atomicBoolean::get, (Supplier<CompletableFuture<Void>>) () -> {
            return Futures.delayedTask(() -> {
                this.synchronizer.fetchUpdates();
                atomicBoolean.set(!this.synchronizer.getState().isCheckpointComplete(str));
                if (!atomicBoolean.get()) {
                    return null;
                }
                log.debug("Waiting on checkpoint: {} currentState is: {}", str, this.synchronizer.getState());
                return null;
            }, Duration.ofMillis(500L), scheduledExecutorService);
        }, scheduledExecutorService);
    }

    private Checkpoint completeCheckpoint(String str) {
        try {
            Map<Segment, Long> positionsForCompletedCheckpoint = this.synchronizer.getState().getPositionsForCompletedCheckpoint(str);
            this.synchronizer.updateStateUnconditionally(new ReaderGroupState.ClearCheckpointsBefore(str));
            if (positionsForCompletedCheckpoint == null) {
                throw new CheckpointFailedException("Checkpoint was cleared before results could be read.");
            }
            return new CheckpointImpl(str, positionsForCompletedCheckpoint);
        } catch (CheckpointFailedException e) {
            throw e;
        }
    }

    @Override // io.pravega.client.stream.ReaderGroup
    public void resetReaderGroup(ReaderGroupConfig readerGroupConfig) {
        this.synchronizer.updateStateUnconditionally(new ReaderGroupState.ReaderGroupStateInit(readerGroupConfig, getSegmentsForStreams(this.controller, readerGroupConfig), getEndSegmentsForStreams(readerGroupConfig)));
    }

    @Override // io.pravega.client.stream.ReaderGroup
    public ReaderSegmentDistribution getReaderSegmentDistribution() {
        this.synchronizer.fetchUpdates();
        ReaderGroupState state = this.synchronizer.getState();
        ImmutableMap.Builder builder = ImmutableMap.builder();
        state.getOnlineReaders().forEach(str -> {
            Map<SegmentWithRange, Long> assignedSegments = state.getAssignedSegments(str);
            builder.put(str, Integer.valueOf(assignedSegments != null ? assignedSegments.size() : 0));
        });
        int numberOfUnassignedSegments = state.getNumberOfUnassignedSegments();
        ImmutableMap build = builder.build();
        log.info("ReaderGroup {} has unassigned segments count = {} and segment distribution as {}", new Object[]{getGroupName(), Integer.valueOf(numberOfUnassignedSegments), build});
        return ReaderSegmentDistribution.builder().readerSegmentDistribution(build).unassignedSegments(numberOfUnassignedSegments).build();
    }

    @VisibleForTesting
    public static Map<SegmentWithRange, Long> getSegmentsForStreams(Controller controller, ReaderGroupConfig readerGroupConfig) {
        Map<Stream, StreamCut> startingStreamCuts = readerGroupConfig.getStartingStreamCuts();
        ArrayList arrayList = new ArrayList(startingStreamCuts.size());
        startingStreamCuts.entrySet().forEach(entry -> {
            if (((StreamCut) entry.getValue()).equals(StreamCut.UNBOUNDED)) {
                arrayList.add(controller.getSegmentsAtTime((Stream) entry.getKey(), 0L));
            } else {
                arrayList.add(CompletableFuture.completedFuture(((StreamCut) entry.getValue()).asImpl().getPositions()));
            }
        });
        return (Map) Futures.getAndHandleExceptions(Futures.allOfWithResults(arrayList).thenApply(list -> {
            return (Map) list.stream().flatMap(map -> {
                return map.entrySet().stream();
            }).collect(Collectors.toMap(entry2 -> {
                return new SegmentWithRange((Segment) entry2.getKey(), null);
            }, entry3 -> {
                return (Long) entry3.getValue();
            }));
        }), InvalidStreamException::new);
    }

    public static Map<Segment, Long> getEndSegmentsForStreams(ReaderGroupConfig readerGroupConfig) {
        return (Map) ((List) readerGroupConfig.getEndingStreamCuts().entrySet().stream().filter(entry -> {
            return !((StreamCut) entry.getValue()).equals(StreamCut.UNBOUNDED);
        }).map(entry2 -> {
            return ((StreamCut) entry2.getValue()).asImpl().getPositions();
        }).collect(Collectors.toList())).stream().flatMap(map -> {
            return map.entrySet().stream();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry3 -> {
            if (((Long) entry3.getValue()).longValue() == -1) {
                return Long.MAX_VALUE;
            }
            return (Long) entry3.getValue();
        }));
    }

    @Override // io.pravega.client.stream.ReaderGroup
    public ReaderGroupMetrics getMetrics() {
        return this;
    }

    @Override // io.pravega.client.stream.ReaderGroupMetrics
    public long unreadBytes() {
        this.synchronizer.fetchUpdates();
        Optional<Map<Stream, Map<Segment, Long>>> positionsForLastCompletedCheckpoint = this.synchronizer.getState().getPositionsForLastCompletedCheckpoint();
        if (positionsForLastCompletedCheckpoint.isPresent()) {
            log.debug("Computing unread bytes based on the last checkPoint position");
            return getUnreadBytes(positionsForLastCompletedCheckpoint.get(), this.synchronizer.getState().getEndSegments(), this.metaFactory);
        }
        log.info("No checkpoints found, using the last known offset to compute unread bytes");
        return getUnreadBytesIgnoringRange(this.synchronizer.getState().getPositions(), this.synchronizer.getState().getEndSegments(), this.metaFactory);
    }

    private long getUnreadBytes(Map<Stream, Map<Segment, Long>> map, Map<Segment, Long> map2, SegmentMetadataClientFactory segmentMetadataClientFactory) {
        log.debug("Compute unread bytes from position {}", map);
        ArrayList arrayList = new ArrayList(map.size());
        for (Map.Entry<Stream, Map<Segment, Long>> entry : map.entrySet()) {
            arrayList.add(getRemainingBytes(segmentMetadataClientFactory, new StreamCutImpl(entry.getKey(), entry.getValue()), computeEndStreamCut(entry.getKey(), map2)));
        }
        return ((Long) Futures.getAndHandleExceptions(Futures.allOfWithResults(arrayList).thenApply(list -> {
            return Long.valueOf(list.stream().mapToLong(l -> {
                return l.longValue();
            }).sum());
        }), RuntimeException::new)).longValue();
    }

    private long getUnreadBytesIgnoringRange(Map<Stream, Map<SegmentWithRange, Long>> map, Map<Segment, Long> map2, SegmentMetadataClientFactory segmentMetadataClientFactory) {
        log.debug("Compute unread bytes from position {}", map);
        long j = 0;
        for (Map.Entry<Stream, Map<SegmentWithRange, Long>> entry : map.entrySet()) {
            j += ((Long) Futures.getAndHandleExceptions(getRemainingBytes(segmentMetadataClientFactory, new StreamCutImpl(entry.getKey(), dropRange(entry.getValue())), computeEndStreamCut(entry.getKey(), map2)), RuntimeException::new)).longValue();
        }
        return j;
    }

    private Map<Segment, Long> dropRange(Map<SegmentWithRange, Long> map) {
        return (Map) map.entrySet().stream().collect(Collectors.toMap(entry -> {
            return ((SegmentWithRange) entry.getKey()).getSegment();
        }, entry2 -> {
            return (Long) entry2.getValue();
        }));
    }

    private StreamCut computeEndStreamCut(Stream stream, Map<Segment, Long> map) {
        Map map2 = (Map) map.entrySet().stream().filter(entry -> {
            return ((Segment) entry.getKey()).getStream().equals(stream);
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        return map2.isEmpty() ? StreamCut.UNBOUNDED : new StreamCutImpl(stream, map2);
    }

    private CompletableFuture<Long> getRemainingBytes(SegmentMetadataClientFactory segmentMetadataClientFactory, StreamCut streamCut, StreamCut streamCut2) {
        CompletableFuture<StreamSegmentSuccessors> segments;
        Map<Segment, Long> positions;
        if (streamCut2.equals(StreamCut.UNBOUNDED)) {
            segments = this.controller.getSuccessors(streamCut);
            positions = Collections.emptyMap();
        } else {
            segments = this.controller.getSegments(streamCut, streamCut2);
            positions = streamCut2.asImpl().getPositions();
        }
        Map<Segment, Long> map = positions;
        return segments.thenApply(streamSegmentSuccessors -> {
            long j = 0;
            DelegationTokenProvider delegationTokenProvider = null;
            for (Segment segment : streamSegmentSuccessors.getSegments()) {
                if (map.containsKey(segment)) {
                    j += ((Long) map.get(segment)).longValue();
                } else {
                    if (delegationTokenProvider == null) {
                        delegationTokenProvider = DelegationTokenProviderFactory.create(streamSegmentSuccessors.getDelegationToken(), this.controller, segment);
                    }
                    SegmentMetadataClient createSegmentMetadataClient = segmentMetadataClientFactory.createSegmentMetadataClient(segment, delegationTokenProvider);
                    try {
                        j += createSegmentMetadataClient.fetchCurrentSegmentLength();
                        if (Collections.singletonList(createSegmentMetadataClient).get(0) != null) {
                            createSegmentMetadataClient.close();
                        }
                    } catch (Throwable th) {
                        if (Collections.singletonList(createSegmentMetadataClient).get(0) != null) {
                            createSegmentMetadataClient.close();
                        }
                        throw th;
                    }
                }
            }
            Iterator<Long> it = streamCut.asImpl().getPositions().values().iterator();
            while (it.hasNext()) {
                j -= it.next().longValue();
            }
            log.debug("Remaining bytes from position: {} to position: {} is {}", new Object[]{streamCut, streamCut2, Long.valueOf(j)});
            return Long.valueOf(j);
        });
    }

    @Override // io.pravega.client.stream.notifications.ReaderGroupNotificationListener
    public Observable<SegmentNotification> getSegmentNotifier(ScheduledExecutorService scheduledExecutorService) {
        Preconditions.checkNotNull(scheduledExecutorService, "executor");
        return this.notifierFactory.getSegmentNotifier(scheduledExecutorService);
    }

    @Override // io.pravega.client.stream.notifications.ReaderGroupNotificationListener
    public Observable<EndOfDataNotification> getEndOfDataNotifier(ScheduledExecutorService scheduledExecutorService) {
        Preconditions.checkNotNull(scheduledExecutorService, "executor");
        return this.notifierFactory.getEndOfDataNotifier(scheduledExecutorService);
    }

    @Override // io.pravega.client.stream.ReaderGroup
    @VisibleForTesting
    public Map<Stream, StreamCut> getStreamCuts() {
        this.synchronizer.fetchUpdates();
        Map<Stream, Map<SegmentWithRange, Long>> positions = this.synchronizer.getState().getPositions();
        HashMap hashMap = new HashMap();
        for (Map.Entry<Stream, Map<SegmentWithRange, Long>> entry : positions.entrySet()) {
            hashMap.put(entry.getKey(), new StreamCutImpl(entry.getKey(), dropRange(entry.getValue())));
        }
        return hashMap;
    }

    @Override // io.pravega.client.stream.ReaderGroup
    public CompletableFuture<Map<Stream, StreamCut>> generateStreamCuts(ScheduledExecutorService scheduledExecutorService) {
        String generateSilientCheckpointId = generateSilientCheckpointId();
        log.debug("Fetching the current StreamCut using id {}", generateSilientCheckpointId);
        this.synchronizer.updateStateUnconditionally(new ReaderGroupState.CreateCheckpoint(generateSilientCheckpointId));
        return waitForCheckpointComplete(generateSilientCheckpointId, scheduledExecutorService).thenApply(r5 -> {
            return completeCheckpointAndFetchStreamCut(generateSilientCheckpointId);
        });
    }

    private String generateSilientCheckpointId() {
        byte[] bArr = new byte[32];
        ThreadLocalRandom.current().nextBytes(bArr);
        return Base64.getEncoder().encodeToString(bArr) + SILENT;
    }

    private Map<Stream, StreamCut> completeCheckpointAndFetchStreamCut(String str) {
        try {
            Optional<Map<Stream, StreamCut>> streamCutsForCompletedCheckpoint = this.synchronizer.getState().getStreamCutsForCompletedCheckpoint(str);
            this.synchronizer.updateStateUnconditionally(new ReaderGroupState.ClearCheckpointsBefore(str));
            return streamCutsForCompletedCheckpoint.orElseThrow(() -> {
                return new CheckpointFailedException("Internal CheckPoint was cleared before results could be read.");
            });
        } catch (CheckpointFailedException e) {
            throw e;
        }
    }

    @Override // io.pravega.client.stream.ReaderGroup, java.lang.AutoCloseable
    public void close() {
        this.synchronizer.close();
    }

    @Override // io.pravega.client.stream.ReaderGroup
    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public String getScope() {
        return this.scope;
    }

    @Override // io.pravega.client.stream.ReaderGroup
    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public String getGroupName() {
        return this.groupName;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public Controller getController() {
        return this.controller;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public SegmentMetadataClientFactory getMetaFactory() {
        return this.metaFactory;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public StateSynchronizer<ReaderGroupState> getSynchronizer() {
        return this.synchronizer;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public NotifierFactory getNotifierFactory() {
        return this.notifierFactory;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public boolean equals(Object obj) {
        if (obj == this) {
            return true;
        }
        if (!(obj instanceof ReaderGroupImpl)) {
            return false;
        }
        ReaderGroupImpl readerGroupImpl = (ReaderGroupImpl) obj;
        if (!readerGroupImpl.canEqual(this)) {
            return false;
        }
        String scope = getScope();
        String scope2 = readerGroupImpl.getScope();
        if (scope == null) {
            if (scope2 != null) {
                return false;
            }
        } else if (!scope.equals(scope2)) {
            return false;
        }
        String groupName = getGroupName();
        String groupName2 = readerGroupImpl.getGroupName();
        if (groupName == null) {
            if (groupName2 != null) {
                return false;
            }
        } else if (!groupName.equals(groupName2)) {
            return false;
        }
        Controller controller = getController();
        Controller controller2 = readerGroupImpl.getController();
        if (controller == null) {
            if (controller2 != null) {
                return false;
            }
        } else if (!controller.equals(controller2)) {
            return false;
        }
        SegmentMetadataClientFactory metaFactory = getMetaFactory();
        SegmentMetadataClientFactory metaFactory2 = readerGroupImpl.getMetaFactory();
        if (metaFactory == null) {
            if (metaFactory2 != null) {
                return false;
            }
        } else if (!metaFactory.equals(metaFactory2)) {
            return false;
        }
        StateSynchronizer<ReaderGroupState> synchronizer = getSynchronizer();
        StateSynchronizer<ReaderGroupState> synchronizer2 = readerGroupImpl.getSynchronizer();
        if (synchronizer == null) {
            if (synchronizer2 != null) {
                return false;
            }
        } else if (!synchronizer.equals(synchronizer2)) {
            return false;
        }
        NotifierFactory notifierFactory = getNotifierFactory();
        NotifierFactory notifierFactory2 = readerGroupImpl.getNotifierFactory();
        return notifierFactory == null ? notifierFactory2 == null : notifierFactory.equals(notifierFactory2);
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    protected boolean canEqual(Object obj) {
        return obj instanceof ReaderGroupImpl;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public int hashCode() {
        String scope = getScope();
        int hashCode = (1 * 59) + (scope == null ? 43 : scope.hashCode());
        String groupName = getGroupName();
        int hashCode2 = (hashCode * 59) + (groupName == null ? 43 : groupName.hashCode());
        Controller controller = getController();
        int hashCode3 = (hashCode2 * 59) + (controller == null ? 43 : controller.hashCode());
        SegmentMetadataClientFactory metaFactory = getMetaFactory();
        int hashCode4 = (hashCode3 * 59) + (metaFactory == null ? 43 : metaFactory.hashCode());
        StateSynchronizer<ReaderGroupState> synchronizer = getSynchronizer();
        int hashCode5 = (hashCode4 * 59) + (synchronizer == null ? 43 : synchronizer.hashCode());
        NotifierFactory notifierFactory = getNotifierFactory();
        return (hashCode5 * 59) + (notifierFactory == null ? 43 : notifierFactory.hashCode());
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public String toString() {
        return "ReaderGroupImpl(scope=" + getScope() + ", groupName=" + getGroupName() + ", controller=" + getController() + ", metaFactory=" + getMetaFactory() + ", synchronizer=" + getSynchronizer() + ", notifierFactory=" + getNotifierFactory() + ")";
    }
}
