package io.pravega.controller.server.bucket;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.ClientConfig;
import io.pravega.client.SynchronizerClientFactory;
import io.pravega.client.segment.impl.NoSuchSegmentException;
import io.pravega.client.state.Revision;
import io.pravega.client.state.RevisionedStreamClient;
import io.pravega.client.state.SynchronizerConfig;
import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.watermark.WatermarkSerializer;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.tracing.TagLogger;
import io.pravega.controller.store.stream.BucketStore;
import io.pravega.controller.store.stream.OperationContext;
import io.pravega.controller.store.stream.StoreException;
import io.pravega.controller.store.stream.StreamMetadataStore;
import io.pravega.controller.store.stream.records.EpochRecord;
import io.pravega.controller.store.stream.records.StreamSegmentRecord;
import io.pravega.controller.store.stream.records.WriterMark;
import io.pravega.shared.NameUtils;
import io.pravega.shared.watermarks.SegmentWithRange;
import io.pravega.shared.watermarks.Watermark;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.ParametersAreNonnullByDefault;
import lombok.Generated;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/controller/server/bucket/PeriodicWatermarking.class */
public class PeriodicWatermarking {
    private static final TagLogger log;
    private static final int MAX_CACHE_SIZE = 500;
    private final StreamMetadataStore streamMetadataStore;
    private final BucketStore bucketStore;
    private final ScheduledExecutorService executor;
    private final LoadingCache<Stream, WatermarkClient> watermarkClientCache;
    private final LoadingCache<String, SynchronizerClientFactory> syncFactoryCache;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/pravega/controller/server/bucket/PeriodicWatermarking$WatermarkClient.class */
    public static class WatermarkClient implements Closeable {
        private final RevisionedStreamClient<Watermark> client;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        private final Object $lock = new Object[0];
        private final AtomicReference<Map.Entry<Revision, Watermark>> previousWatermark = new AtomicReference<>();
        private final AtomicReference<Revision> markRevision = new AtomicReference<>();
        private final ConcurrentHashMap<String, Long> inactiveWriters = new ConcurrentHashMap<>();

        @VisibleForTesting
        WatermarkClient(Stream stream, SynchronizerClientFactory synchronizerClientFactory) {
            this.client = synchronizerClientFactory.createRevisionedStreamClient(NameUtils.getMarkStreamForStream(stream.getStreamName()), new WatermarkSerializer(), SynchronizerConfig.builder().build());
        }

        Watermark getPreviousWatermark() {
            Map.Entry<Revision, Watermark> entry = this.previousWatermark.get();
            return entry == null ? Watermark.EMPTY : entry.getValue();
        }

        /* JADX WARN: Multi-variable type inference failed */
        void reinitialize() {
            synchronized (this.$lock) {
                Revision mark = this.client.getMark();
                if (mark == null) {
                    this.markRevision.set(this.client.fetchOldestRevision());
                    this.client.compareAndSetMark((Revision) null, this.markRevision.get());
                } else {
                    this.markRevision.set(mark);
                }
                ArrayList newArrayList = Lists.newArrayList(this.client.readFrom(this.markRevision.get()));
                if (newArrayList.isEmpty()) {
                    this.previousWatermark.set(null);
                } else {
                    this.previousWatermark.set(newArrayList.get(newArrayList.size() - 1));
                }
            }
        }

        void completeIteration(Watermark watermark) {
            Map.Entry<Revision, Watermark> entry = this.previousWatermark.get();
            if (watermark != null) {
                if (this.client.writeConditionally(entry == null ? this.markRevision.get() : entry.getKey(), watermark) == null || entry == null) {
                    return;
                }
                this.client.compareAndSetMark(this.markRevision.get(), entry.getKey());
            }
        }

        boolean isWriterActive(Map.Entry<String, WriterMark> entry, long j) {
            if (isWriterParticipating(entry.getValue().getTimestamp())) {
                this.inactiveWriters.remove(entry.getKey());
                return true;
            }
            long currentTimeMillis = System.currentTimeMillis();
            this.inactiveWriters.putIfAbsent(entry.getKey(), Long.valueOf(currentTimeMillis));
            return entry.getValue().isAlive() && !(((currentTimeMillis - this.inactiveWriters.getOrDefault(entry.getKey(), Long.valueOf(currentTimeMillis)).longValue()) > j ? 1 : ((currentTimeMillis - this.inactiveWriters.getOrDefault(entry.getKey(), Long.valueOf(currentTimeMillis)).longValue()) == j ? 0 : -1)) >= 0);
        }

        boolean isWriterParticipating(long j) {
            Map.Entry<Revision, Watermark> entry = this.previousWatermark.get();
            return entry == null || j > entry.getValue().getLowerTimeBound();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void untrackWriterInactivity(String str) {
            this.inactiveWriters.remove(str);
        }

        @VisibleForTesting
        boolean isWriterTracked(String str) {
            return this.inactiveWriters.containsKey(str);
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            this.client.close();
        }
    }

    public PeriodicWatermarking(StreamMetadataStore streamMetadataStore, BucketStore bucketStore, ClientConfig clientConfig, ScheduledExecutorService scheduledExecutorService) {
        this(streamMetadataStore, bucketStore, (Function<String, SynchronizerClientFactory>) str -> {
            return SynchronizerClientFactory.withScope(str, clientConfig);
        }, scheduledExecutorService);
    }

    @VisibleForTesting
    PeriodicWatermarking(StreamMetadataStore streamMetadataStore, BucketStore bucketStore, final Function<String, SynchronizerClientFactory> function, ScheduledExecutorService scheduledExecutorService) {
        this.streamMetadataStore = streamMetadataStore;
        this.bucketStore = bucketStore;
        this.executor = scheduledExecutorService;
        this.syncFactoryCache = CacheBuilder.newBuilder().maximumSize(500L).expireAfterAccess(10L, TimeUnit.MINUTES).removalListener(removalNotification -> {
            ((SynchronizerClientFactory) removalNotification.getValue()).close();
        }).build(new CacheLoader<String, SynchronizerClientFactory>() { // from class: io.pravega.controller.server.bucket.PeriodicWatermarking.1
            @ParametersAreNonnullByDefault
            public SynchronizerClientFactory load(String str) {
                return (SynchronizerClientFactory) function.apply(str);
            }
        });
        this.watermarkClientCache = CacheBuilder.newBuilder().maximumSize(500L).expireAfterAccess(10L, TimeUnit.MINUTES).removalListener(removalNotification2 -> {
            ((WatermarkClient) removalNotification2.getValue()).close();
        }).build(new CacheLoader<Stream, WatermarkClient>() { // from class: io.pravega.controller.server.bucket.PeriodicWatermarking.2
            @ParametersAreNonnullByDefault
            public WatermarkClient load(Stream stream) {
                return new WatermarkClient(stream, (SynchronizerClientFactory) PeriodicWatermarking.this.syncFactoryCache.getUnchecked(stream.getScope()));
            }
        });
    }

    public CompletableFuture<Void> watermark(Stream stream) {
        String scope = stream.getScope();
        String streamName = stream.getStreamName();
        OperationContext createContext = this.streamMetadataStore.createContext(scope, streamName);
        if (scope.equals("_system")) {
            return CompletableFuture.completedFuture(null);
        }
        log.debug("Periodic background processing for watermarking called for stream {}/{}", scope, streamName);
        return Futures.exceptionallyExpecting(this.streamMetadataStore.getAllWriterMarks(scope, streamName, createContext, this.executor), th -> {
            return Exceptions.unwrap(th) instanceof StoreException.DataNotFoundException;
        }, Collections.emptyMap()).thenCompose(map -> {
            WatermarkClient watermarkClient = (WatermarkClient) this.watermarkClientCache.getUnchecked(stream);
            try {
                watermarkClient.reinitialize();
                return this.streamMetadataStore.getConfiguration(scope, streamName, createContext, this.executor).thenCompose(streamConfiguration -> {
                    return filterWritersAndComputeWatermark(scope, streamName, createContext, watermarkClient, map, streamConfiguration);
                });
            } catch (Exception e) {
                log.warn("Watermarking client for stream {} threw exception {} during reinitialize.", stream, Exceptions.unwrap(e).getClass());
                if (Exceptions.unwrap(e) instanceof NoSuchSegmentException) {
                    log.info("Invalidating the watermark client in cache for stream {}.", stream);
                    this.watermarkClientCache.invalidate(stream);
                }
                throw e;
            }
        }).exceptionally(th2 -> {
            log.warn("Exception thrown while trying to perform periodic watermark computation. Logging and ignoring.", th2);
            return null;
        });
    }

    private CompletionStage<Void> filterWritersAndComputeWatermark(String str, String str2, OperationContext operationContext, WatermarkClient watermarkClient, Map<String, WriterMark> map, StreamConfiguration streamConfiguration) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        map.entrySet().forEach(entry -> {
            if (!watermarkClient.isWriterActive(entry, streamConfiguration.getTimestampAggregationTimeout())) {
                arrayList2.add(entry);
                return;
            }
            arrayList.add(entry);
            if (watermarkClient.isWriterParticipating(((WriterMark) entry.getValue()).getTimestamp())) {
                return;
            }
            atomicBoolean.set(false);
        });
        CompletableFuture allOfWithResults = Futures.allOfWithResults((List) arrayList2.stream().map(entry2 -> {
            return Futures.exceptionallyExpecting(this.streamMetadataStore.removeWriter(str, str2, (String) entry2.getKey(), (WriterMark) entry2.getValue(), operationContext, this.executor).thenAccept(r5 -> {
                watermarkClient.untrackWriterInactivity((String) entry2.getKey());
            }), th -> {
                return Exceptions.unwrap(th) instanceof StoreException.WriteConflictException;
            }, (Object) null);
        }).collect(Collectors.toList()));
        if (arrayList.isEmpty()) {
            return allOfWithResults.thenCompose(list -> {
                return this.bucketStore.removeStreamFromBucketStore(BucketStore.ServiceType.WatermarkingService, str, str2, this.executor);
            });
        }
        CompletableFuture<Watermark> completedFuture = !atomicBoolean.get() ? CompletableFuture.completedFuture(null) : computeWatermark(str, str2, operationContext, arrayList, watermarkClient.getPreviousWatermark());
        watermarkClient.getClass();
        return CompletableFuture.allOf(allOfWithResults, completedFuture.thenAccept(watermarkClient::completeIteration));
    }

    private CompletableFuture<Watermark> computeWatermark(String str, String str2, OperationContext operationContext, List<Map.Entry<String, WriterMark>> list, Watermark watermark) {
        Watermark.WatermarkBuilder builder = Watermark.builder();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        LongSummaryStatistics longSummaryStatistics = (LongSummaryStatistics) list.stream().collect(Collectors.summarizingLong(entry -> {
            return ((WriterMark) entry.getValue()).getTimestamp();
        }));
        long min = longSummaryStatistics.getMin();
        long max = longSummaryStatistics.getMax();
        if (min <= watermark.getLowerTimeBound()) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture allOfWithResults = Futures.allOfWithResults((List) list.stream().map(entry2 -> {
            return Futures.keysAllOfWithResults((Map) ((WriterMark) entry2.getValue()).getPosition().entrySet().stream().collect(Collectors.toMap(entry2 -> {
                return getSegmentWithRange(str, str2, operationContext, ((Long) entry2.getKey()).longValue());
            }, (v0) -> {
                return v0.getValue();
            })));
        }).collect(Collectors.toList()));
        log.debug("Emitting watermark for stream {}/{} with time {}", new Object[]{str, str2, Long.valueOf(min)});
        return allOfWithResults.thenAccept(list2 -> {
            list2.forEach(map -> {
                addToUpperBound(map, concurrentHashMap);
            });
        }).thenCompose(r18 -> {
            return computeStreamCut(str, str2, operationContext, concurrentHashMap, watermark).thenApply(map -> {
                return builder.lowerTimeBound(min).upperTimeBound(max).streamCut(ImmutableMap.copyOf(map)).build();
            });
        });
    }

    private CompletableFuture<SegmentWithRange> getSegmentWithRange(String str, String str2, OperationContext operationContext, long j) {
        return this.streamMetadataStore.getSegment(str, str2, j, operationContext, this.executor).thenApply(this::transform);
    }

    private void addToUpperBound(Map<SegmentWithRange, Long> map, Map<SegmentWithRange, Long> map2) {
        for (Map.Entry<SegmentWithRange, Long> entry : map.entrySet()) {
            SegmentWithRange key = entry.getKey();
            long longValue = entry.getValue().longValue();
            if (map2.containsKey(key)) {
                map2.put(key, Long.valueOf(Math.max(longValue, map2.get(key).longValue())));
            } else if (!hasSuccessors(key, map2.keySet())) {
                map2.keySet().forEach(segmentWithRange -> {
                    if (!key.overlaps(segmentWithRange) || key.getSegmentId() <= segmentWithRange.getSegmentId()) {
                        return;
                    }
                    map2.remove(segmentWithRange);
                });
                map2.put(key, Long.valueOf(longValue));
            }
        }
    }

    private boolean hasSuccessors(SegmentWithRange segmentWithRange, Set<SegmentWithRange> set) {
        return set.stream().anyMatch(segmentWithRange2 -> {
            return segmentWithRange.overlaps(segmentWithRange2) && segmentWithRange.getSegmentId() < segmentWithRange2.getSegmentId();
        });
    }

    private CompletableFuture<Map<SegmentWithRange, Long>> computeStreamCut(String str, String str2, OperationContext operationContext, Map<SegmentWithRange, Long> map, Watermark watermark) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(map);
        AtomicReference atomicReference = new AtomicReference(findMissingRanges(concurrentHashMap));
        if (watermark != null && !watermark.equals(Watermark.EMPTY)) {
            addToUpperBound(watermark.getStreamCut(), concurrentHashMap);
        }
        return Futures.doWhileLoop(() -> {
            int orElse = concurrentHashMap.keySet().stream().mapToInt(segmentWithRange -> {
                return NameUtils.getEpoch(segmentWithRange.getSegmentId());
            }).max().orElse(-1);
            if ($assertionsDisabled || orElse >= 0) {
                return this.streamMetadataStore.getEpoch(str, str2, orElse, operationContext, this.executor).thenApply(epochRecord -> {
                    ((Map) atomicReference.get()).entrySet().forEach(entry -> {
                        addToUpperBound((Map) findSegmentsForMissingRange(epochRecord, entry).stream().collect(Collectors.toMap(segmentWithRange2 -> {
                            return segmentWithRange2;
                        }, segmentWithRange3 -> {
                            return 0L;
                        })), concurrentHashMap);
                    });
                    return (Map) atomicReference.updateAndGet(map2 -> {
                        return findMissingRanges(concurrentHashMap);
                    });
                });
            }
            throw new AssertionError();
        }, map2 -> {
            return !map2.isEmpty();
        }, this.executor).thenApply(r3 -> {
            return concurrentHashMap;
        });
    }

    private SegmentWithRange transform(StreamSegmentRecord streamSegmentRecord) {
        return SegmentWithRange.builder().segmentId(streamSegmentRecord.segmentId()).rangeLow(streamSegmentRecord.getKeyStart()).rangeHigh(streamSegmentRecord.getKeyEnd()).build();
    }

    private List<SegmentWithRange> findSegmentsForMissingRange(EpochRecord epochRecord, Map.Entry<Double, Double> entry) {
        return (List) epochRecord.getSegments().stream().filter(streamSegmentRecord -> {
            return streamSegmentRecord.overlaps(((Double) entry.getKey()).doubleValue(), ((Double) entry.getValue()).doubleValue());
        }).map(this::transform).collect(Collectors.toList());
    }

    private Map<Double, Double> findMissingRanges(Map<SegmentWithRange, Long> map) {
        HashMap hashMap = new HashMap();
        List list = (List) map.entrySet().stream().sorted(Comparator.comparingDouble(entry -> {
            return ((SegmentWithRange) entry.getKey()).getRangeLow();
        })).collect(Collectors.toList());
        Map.Entry entry2 = (Map.Entry) list.get(0);
        if (((SegmentWithRange) entry2.getKey()).getRangeLow() > 0.0d) {
            hashMap.put(Double.valueOf(0.0d), Double.valueOf(((SegmentWithRange) entry2.getKey()).getRangeLow()));
        }
        for (int i = 1; i < list.size(); i++) {
            Map.Entry entry3 = (Map.Entry) list.get(i);
            if (((SegmentWithRange) entry2.getKey()).getRangeHigh() != ((SegmentWithRange) entry3.getKey()).getRangeLow()) {
                hashMap.put(Double.valueOf(((SegmentWithRange) entry2.getKey()).getRangeHigh()), Double.valueOf(((SegmentWithRange) entry3.getKey()).getRangeLow()));
            }
            entry2 = entry3;
        }
        if (((SegmentWithRange) entry2.getKey()).getRangeHigh() < 1.0d) {
            hashMap.put(Double.valueOf(((SegmentWithRange) entry2.getKey()).getRangeHigh()), Double.valueOf(1.0d));
        }
        return hashMap;
    }

    @VisibleForTesting
    boolean checkExistsInCache(Stream stream) {
        return this.watermarkClientCache.asMap().containsKey(stream);
    }

    @VisibleForTesting
    boolean checkExistsInCache(String str) {
        return this.syncFactoryCache.asMap().containsKey(str);
    }

    @VisibleForTesting
    void evictFromCache(String str) {
        this.syncFactoryCache.invalidate(str);
    }

    static {
        $assertionsDisabled = !PeriodicWatermarking.class.desiredAssertionStatus();
        log = new TagLogger(LoggerFactory.getLogger(PeriodicWatermarking.class));
    }
}
