package io.pravega.client.stream.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.control.impl.Controller;
import io.pravega.client.control.impl.ControllerFailureException;
import io.pravega.client.security.auth.DelegationTokenProvider;
import io.pravega.client.segment.impl.NoSuchSegmentException;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.segment.impl.SegmentOutputStream;
import io.pravega.client.segment.impl.SegmentOutputStreamFactory;
import io.pravega.client.segment.impl.SegmentSealedException;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.Stream;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.hash.RandomFactory;
import io.pravega.common.util.RetriesExhaustedException;
import io.pravega.connectors.flink.table.descriptors.Pravega;
import io.pravega.shaded.com.google.common.base.Preconditions;
import java.beans.ConstructorProperties;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.Consumer;
import javax.annotation.concurrent.GuardedBy;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/stream/impl/SegmentSelector.class */
public class SegmentSelector {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(SegmentSelector.class);
    private final Stream stream;
    private final Controller controller;
    private final SegmentOutputStreamFactory outputStreamFactory;

    @GuardedBy("$lock")
    private StreamSegments currentSegments;
    private final EventWriterConfig config;
    private final DelegationTokenProvider tokenProvider;

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private final Object $lock = new Object[0];

    @GuardedBy("$lock")
    private final Random random = RandomFactory.create();

    @GuardedBy("$lock")
    private final Map<Segment, SegmentOutputStream> writers = new HashMap();

    public SegmentOutputStream getSegmentOutputStreamForKey(String str) {
        synchronized (this.$lock) {
            if (this.currentSegments == null) {
                return null;
            }
            return this.writers.get(getSegmentForEvent(str));
        }
    }

    public Segment getSegmentForEvent(String str) {
        synchronized (this.$lock) {
            if (this.currentSegments == null) {
                return null;
            }
            if (str == null) {
                return this.currentSegments.getSegmentForKey(this.random.nextDouble());
            }
            return this.currentSegments.getSegmentForKey(str);
        }
    }

    public List<PendingEvent> refreshSegmentEventWritersUponSealed(Segment segment, Consumer<Segment> consumer) {
        StreamSegmentsWithPredecessors streamSegmentsWithPredecessors = (StreamSegmentsWithPredecessors) Futures.getAndHandleExceptions(this.controller.getSuccessors(segment), th -> {
            log.error("Error while fetching successors for segment: {}", segment, th);
            Exception controllerFailureException = th instanceof RetriesExhaustedException ? new ControllerFailureException(th) : new NoSuchSegmentException(segment.toString(), th);
            removeAllWriters().forEach(pendingEvent -> {
                pendingEvent.getAckFuture().completeExceptionally(controllerFailureException);
            });
            return null;
        });
        if (streamSegmentsWithPredecessors == null) {
            return Collections.emptyList();
        }
        if (!streamSegmentsWithPredecessors.getSegmentToPredecessor().isEmpty()) {
            return updateSegmentsUponSealed(streamSegmentsWithPredecessors, segment, consumer);
        }
        log.warn("Stream {} is sealed since no successor segments found for segment {} ", segment.getStream(), segment);
        IllegalStateException illegalStateException = new IllegalStateException("Writes cannot proceed since the stream is sealed");
        removeAllWriters().forEach(pendingEvent -> {
            pendingEvent.getAckFuture().completeExceptionally(illegalStateException);
        });
        return Collections.emptyList();
    }

    public List<PendingEvent> refreshSegmentEventWriters(Consumer<Segment> consumer) {
        log.info("Refreshing segments for stream {}", this.stream);
        return updateSegments((StreamSegments) Futures.getAndHandleExceptions(this.controller.getCurrentSegments(this.stream.getScope(), this.stream.getStreamName()), RuntimeException::new), consumer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeSegmentWriter(Segment segment) {
        synchronized (this.$lock) {
            this.writers.remove(segment);
        }
    }

    private List<PendingEvent> updateSegments(StreamSegments streamSegments, Consumer<Segment> consumer) {
        ArrayList arrayList;
        synchronized (this.$lock) {
            Preconditions.checkState(streamSegments.getNumberOfSegments() > 0, "Writers cannot proceed writing since the stream %s is sealed", this.stream);
            this.currentSegments = streamSegments;
            createMissingWriters(consumer);
            arrayList = new ArrayList();
            Iterator<Map.Entry<Segment, SegmentOutputStream>> it = this.writers.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Segment, SegmentOutputStream> next = it.next();
                if (!this.currentSegments.getSegments().contains(next.getKey())) {
                    SegmentOutputStream value = next.getValue();
                    log.info("Closing writer {} on segment {} during segment refresh", value, next.getKey());
                    it.remove();
                    try {
                        value.close();
                    } catch (SegmentSealedException e) {
                        log.info("Caught segment sealed while refreshing on segment {}", next.getKey());
                    }
                    arrayList.addAll(value.getUnackedEventsOnSeal());
                }
            }
        }
        return arrayList;
    }

    private List<PendingEvent> updateSegmentsUponSealed(StreamSegmentsWithPredecessors streamSegmentsWithPredecessors, Segment segment, Consumer<Segment> consumer) {
        List<PendingEvent> unackedEventsOnSeal;
        synchronized (this.$lock) {
            this.currentSegments = this.currentSegments.withReplacementRange(segment, streamSegmentsWithPredecessors);
            createMissingWriters(consumer);
            log.debug("Fetch unacked events for segment: {}, and adding new segments {}", segment, this.currentSegments);
            unackedEventsOnSeal = this.writers.get(segment).getUnackedEventsOnSeal();
        }
        return unackedEventsOnSeal;
    }

    private List<PendingEvent> removeAllWriters() {
        ArrayList arrayList;
        synchronized (this.$lock) {
            arrayList = new ArrayList();
            this.writers.values().forEach(segmentOutputStream -> {
                arrayList.addAll(segmentOutputStream.getUnackedEventsOnSeal());
            });
            this.writers.clear();
        }
        return arrayList;
    }

    private void createMissingWriters(Consumer<Segment> consumer) {
        this.tokenProvider.populateToken(this.currentSegments.getDelegationToken());
        for (Segment segment : this.currentSegments.getSegments()) {
            if (!this.writers.containsKey(segment)) {
                log.debug("Creating writer for segment {}", segment);
                this.writers.put(segment, this.outputStreamFactory.createOutputStreamForSegment(segment, consumer, this.config, this.tokenProvider));
            }
        }
    }

    public List<Segment> getSegments() {
        synchronized (this.$lock) {
            if (this.currentSegments == null) {
                return Collections.emptyList();
            }
            return new ArrayList(this.currentSegments.getSegments());
        }
    }

    public Map<Segment, SegmentOutputStream> getWriters() {
        HashMap hashMap;
        synchronized (this.$lock) {
            hashMap = new HashMap(this.writers);
        }
        return hashMap;
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    @ConstructorProperties({Pravega.CONNECTOR_READER_STREAM_INFO_STREAM, "controller", "outputStreamFactory", "config", "tokenProvider"})
    public SegmentSelector(Stream stream, Controller controller, SegmentOutputStreamFactory segmentOutputStreamFactory, EventWriterConfig eventWriterConfig, DelegationTokenProvider delegationTokenProvider) {
        this.stream = stream;
        this.controller = controller;
        this.outputStreamFactory = segmentOutputStreamFactory;
        this.config = eventWriterConfig;
        this.tokenProvider = delegationTokenProvider;
    }
}
