package io.pravega.segmentstore.server.containers;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.ObjectBuilder;
import io.pravega.common.Timer;
import io.pravega.common.concurrent.AbstractThreadPoolService;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.io.BoundedInputStream;
import io.pravega.common.io.serialization.RevisionDataInput;
import io.pravega.common.io.serialization.RevisionDataOutput;
import io.pravega.common.io.serialization.VersionedSerializer;
import io.pravega.common.util.BufferView;
import io.pravega.common.util.ByteArraySegment;
import io.pravega.segmentstore.contracts.SegmentType;
import io.pravega.segmentstore.contracts.StreamSegmentNotExistsException;
import io.pravega.segmentstore.server.ContainerEventProcessor;
import io.pravega.segmentstore.server.DirectSegmentAccess;
import io.pravega.segmentstore.server.SegmentContainer;
import io.pravega.segmentstore.server.SegmentMetadata;
import io.pravega.segmentstore.server.SegmentStoreMetrics;
import io.pravega.segmentstore.server.reading.AsyncReadResultProcessor;
import io.pravega.shared.NameUtils;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Generated;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/pravega/segmentstore/server/containers/ContainerEventProcessorImpl.class */
public class ContainerEventProcessorImpl implements ContainerEventProcessor {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ContainerEventProcessorImpl.class);
    private static final Duration SHUTDOWN_TIMEOUT = Duration.ofSeconds(10);
    private static final SegmentType SYSTEM_CRITICAL_SEGMENT = SegmentType.builder().system().internal().critical().build();
    private final int containerId;

    @VisibleForTesting
    private final Map<String, CompletableFuture<ContainerEventProcessor.EventProcessor>> eventProcessorMap;
    private final Function<String, CompletableFuture<DirectSegmentAccess>> segmentSupplier;
    private final Duration iterationDelay;
    private final Duration containerOperationTimeout;
    private final AtomicBoolean closed;
    private final String traceObjectId;
    private final ScheduledExecutorService executor;

    /* loaded from: input_file:io/pravega/segmentstore/server/containers/ContainerEventProcessorImpl$EventProcessorImpl.class */
    static class EventProcessorImpl extends AbstractThreadPoolService implements ContainerEventProcessor.EventProcessor {
        private static final ProcessorEventData.ProcessorEventDataSerializer SERIALIZER = new ProcessorEventData.ProcessorEventDataSerializer();
        private final String name;
        private final int containerId;
        private final Function<List<BufferView>, CompletableFuture<Void>> handler;
        private final ContainerEventProcessor.EventProcessorConfig config;
        private final Duration iterationDelay;
        private final Duration containerOperationTimeout;
        private final Runnable onClose;
        private final DirectSegmentAccess segment;
        private final SegmentStoreMetrics.EventProcessor metrics;
        private final AtomicBoolean closed;
        private final AtomicBoolean failedIteration;
        private final AtomicLong segmentStartOffset;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:io/pravega/segmentstore/server/containers/ContainerEventProcessorImpl$EventProcessorImpl$EventsReadAndTruncationLength.class */
        public static class EventsReadAndTruncationLength {
            private final List<ProcessorEventData> eventsRead;
            private final long truncationLength;

            public List<BufferView> getProcessorEventsData() {
                return (List) this.eventsRead.stream().map((v0) -> {
                    return v0.getData();
                }).collect(Collectors.toUnmodifiableList());
            }

            @SuppressFBWarnings(justification = "generated code")
            @Generated
            @ConstructorProperties({"eventsRead", "truncationLength"})
            public EventsReadAndTruncationLength(List<ProcessorEventData> list, long j) {
                this.eventsRead = list;
                this.truncationLength = j;
            }

            @SuppressFBWarnings(justification = "generated code")
            @Generated
            public List<ProcessorEventData> getEventsRead() {
                return this.eventsRead;
            }

            @SuppressFBWarnings(justification = "generated code")
            @Generated
            public long getTruncationLength() {
                return this.truncationLength;
            }

            @SuppressFBWarnings(justification = "generated code")
            @Generated
            public boolean equals(Object obj) {
                if (obj == this) {
                    return true;
                }
                if (!(obj instanceof EventsReadAndTruncationLength)) {
                    return false;
                }
                EventsReadAndTruncationLength eventsReadAndTruncationLength = (EventsReadAndTruncationLength) obj;
                if (!eventsReadAndTruncationLength.canEqual(this)) {
                    return false;
                }
                List<ProcessorEventData> eventsRead = getEventsRead();
                List<ProcessorEventData> eventsRead2 = eventsReadAndTruncationLength.getEventsRead();
                if (eventsRead == null) {
                    if (eventsRead2 != null) {
                        return false;
                    }
                } else if (!eventsRead.equals(eventsRead2)) {
                    return false;
                }
                return getTruncationLength() == eventsReadAndTruncationLength.getTruncationLength();
            }

            @SuppressFBWarnings(justification = "generated code")
            @Generated
            protected boolean canEqual(Object obj) {
                return obj instanceof EventsReadAndTruncationLength;
            }

            @SuppressFBWarnings(justification = "generated code")
            @Generated
            public int hashCode() {
                List<ProcessorEventData> eventsRead = getEventsRead();
                int hashCode = (1 * 59) + (eventsRead == null ? 43 : eventsRead.hashCode());
                long truncationLength = getTruncationLength();
                return (hashCode * 59) + ((int) ((truncationLength >>> 32) ^ truncationLength));
            }

            @SuppressFBWarnings(justification = "generated code")
            @Generated
            public String toString() {
                return "ContainerEventProcessorImpl.EventProcessorImpl.EventsReadAndTruncationLength(eventsRead=" + getEventsRead() + ", truncationLength=" + getTruncationLength() + ")";
            }
        }

        /* loaded from: input_file:io/pravega/segmentstore/server/containers/ContainerEventProcessorImpl$EventProcessorImpl$NoDataAvailableException.class */
        static class NoDataAvailableException extends RuntimeException {
            NoDataAvailableException() {
            }
        }

        public EventProcessorImpl(@NonNull String str, int i, @NonNull DirectSegmentAccess directSegmentAccess, @NonNull Function<List<BufferView>, CompletableFuture<Void>> function, @NonNull ContainerEventProcessor.EventProcessorConfig eventProcessorConfig, @NonNull Duration duration, @NonNull Duration duration2, @NonNull Runnable runnable, @NonNull ScheduledExecutorService scheduledExecutorService) {
            super(String.format("EventProcessor[%d-%s]", Integer.valueOf(i), Long.valueOf(directSegmentAccess.getSegmentId())), scheduledExecutorService);
            if (str == null) {
                throw new NullPointerException("name is marked non-null but is null");
            }
            if (directSegmentAccess == null) {
                throw new NullPointerException("segment is marked non-null but is null");
            }
            if (function == null) {
                throw new NullPointerException("handler is marked non-null but is null");
            }
            if (eventProcessorConfig == null) {
                throw new NullPointerException("config is marked non-null but is null");
            }
            if (duration == null) {
                throw new NullPointerException("iterationDelay is marked non-null but is null");
            }
            if (duration2 == null) {
                throw new NullPointerException("containerOperationTimeout is marked non-null but is null");
            }
            if (runnable == null) {
                throw new NullPointerException("onClose is marked non-null but is null");
            }
            if (scheduledExecutorService == null) {
                throw new NullPointerException("executor is marked non-null but is null");
            }
            this.name = str;
            this.containerId = i;
            this.segment = directSegmentAccess;
            this.handler = function;
            this.config = eventProcessorConfig;
            this.iterationDelay = duration;
            this.containerOperationTimeout = duration2;
            this.onClose = runnable;
            this.metrics = new SegmentStoreMetrics.EventProcessor(str, i);
            this.closed = new AtomicBoolean(false);
            this.failedIteration = new AtomicBoolean(false);
            this.segmentStartOffset = new AtomicLong(directSegmentAccess.getInfo().getStartOffset());
        }

        @Override // io.pravega.segmentstore.server.ContainerEventProcessor.EventProcessor
        public CompletableFuture<Long> add(@NonNull BufferView bufferView, Duration duration) throws ContainerEventProcessor.TooManyOutstandingBytesException {
            try {
                if (bufferView == null) {
                    throw new NullPointerException("event is marked non-null but is null");
                }
                Preconditions.checkArgument(bufferView.getLength() > 0);
                Preconditions.checkArgument(bufferView.getLength() + 4 < 1048576);
                Exceptions.checkNotClosed(this.closed.get(), this);
                if (getOutstandingBytes() > this.config.getMaxProcessorOutstandingBytes()) {
                    throw new ContainerEventProcessor.TooManyOutstandingBytesException(this.traceObjectId);
                }
                return this.segment.append(SERIALIZER.serialize(ProcessorEventData.builder().data(bufferView).m11build()), null, duration).thenApply(l -> {
                    return Long.valueOf(getOutstandingBytes());
                });
            } catch (IOException e) {
                throw e;
            }
        }

        @VisibleForTesting
        long getOutstandingBytes() {
            SegmentMetadata info = this.segment.getInfo();
            return info.getLength() - info.getStartOffset();
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            if (this.closed.getAndSet(true)) {
                return;
            }
            ContainerEventProcessorImpl.log.info("{}: Closing EventProcessor.", this.traceObjectId);
            super.close();
            this.metrics.close();
            this.onClose.run();
        }

        protected Duration getShutdownTimeout() {
            return ContainerEventProcessorImpl.SHUTDOWN_TIMEOUT;
        }

        public CompletableFuture<Void> doRun() {
            Exceptions.checkNotClosed(this.closed.get(), this);
            ContainerEventProcessorImpl.log.info("{} Starting processing.", this.traceObjectId);
            return Futures.loop(() -> {
                return Boolean.valueOf(!this.closed.get());
            }, () -> {
                return getDelayedFutureIfNeeded().thenComposeAsync(r3 -> {
                    return processEvents();
                }, (Executor) this.executor);
            }, this.executor).handle((r6, th) -> {
                if (th != null) {
                    ContainerEventProcessorImpl.log.warn("{}: Terminated due to unexpected exception.", this.traceObjectId, th);
                    return null;
                }
                ContainerEventProcessorImpl.log.info("{}: Terminated.", this.traceObjectId);
                return null;
            });
        }

        private CompletableFuture<Void> getDelayedFutureIfNeeded() {
            return (this.failedIteration.get() || getOutstandingBytes() == 0) ? Futures.delayedFuture(this.iterationDelay, this.executor) : Futures.delayedFuture(Duration.ZERO, this.executor);
        }

        private CompletableFuture<Void> processEvents() {
            Timer timer = new Timer();
            return readEvents().thenComposeAsync(this::applyProcessorHandler, (Executor) this.executor).thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) eventsReadAndTruncationLength -> {
                return truncateInternalSegment(eventsReadAndTruncationLength, timer);
            }, (Executor) this.executor).handleAsync((r9, th) -> {
                if (th == null || (Exceptions.unwrap(th) instanceof NoDataAvailableException)) {
                    this.failedIteration.set(false);
                } else {
                    ContainerEventProcessorImpl.log.warn("{}: Processing iteration failed, retrying.", this.traceObjectId, th);
                    this.failedIteration.set(true);
                    reconcileStartOffset();
                }
                ContainerEventProcessorImpl.log.debug("{}: Finished iteration for EventProcessor (Name = {}, Start offset = {}, Failed iteration = {}).", new Object[]{this.traceObjectId, this.name, Long.valueOf(this.segmentStartOffset.get()), Boolean.valueOf(this.failedIteration.get())});
                return null;
            });
        }

        private void reconcileStartOffset() {
            long startOffset = this.segment.getInfo().getStartOffset();
            ContainerEventProcessorImpl.log.info("{}: Reconciling start offset from {} to {}.", new Object[]{this.traceObjectId, Long.valueOf(this.segmentStartOffset.get()), Long.valueOf(startOffset)});
            this.segmentStartOffset.set(startOffset);
        }

        private CompletableFuture<EventsReadAndTruncationLength> readEvents() {
            return CompletableFuture.supplyAsync(() -> {
                long outstandingBytes = getOutstandingBytes();
                SegmentStoreMetrics.outstandingEventProcessorBytes(this.name, this.containerId, outstandingBytes);
                return this.segment.read(this.segmentStartOffset.get(), (int) Math.min(outstandingBytes, 2097152L), this.containerOperationTimeout);
            }, this.executor).thenCompose(readResult -> {
                return AsyncReadResultProcessor.processAll(readResult, this.executor, this.containerOperationTimeout);
            }).thenApply(this::deserializeEvents);
        }

        /* JADX WARN: Finally extract failed */
        private EventsReadAndTruncationLength deserializeEvents(BufferView bufferView) {
            try {
                ArrayList arrayList = new ArrayList();
                long j = 0;
                Throwable th = null;
                int length = bufferView.getLength();
                try {
                    try {
                        InputStream boundedInputStream = new BoundedInputStream(bufferView.getReader(), bufferView.getLength());
                        while (boundedInputStream.getRemaining() > 0 && arrayList.size() < this.config.getMaxItemsAtOnce()) {
                            try {
                                arrayList.add((ProcessorEventData) SERIALIZER.deserialize(boundedInputStream));
                                j = length - boundedInputStream.getRemaining();
                            } catch (Throwable th2) {
                                if (Collections.singletonList(boundedInputStream).get(0) != null) {
                                    boundedInputStream.close();
                                }
                                throw th2;
                            }
                        }
                        if (Collections.singletonList(boundedInputStream).get(0) != null) {
                            boundedInputStream.close();
                        }
                    } catch (Exception e) {
                        ContainerEventProcessorImpl.log.error("{}: Error deserializing events (SegmentId = {}, Initial offset = {}, Read length = {}, Truncation length = {}).", new Object[]{this.traceObjectId, Long.valueOf(this.segment.getSegmentId()), Long.valueOf(this.segmentStartOffset.get()), Integer.valueOf(length), 0L, e});
                        th = e;
                    }
                } catch (BufferView.Reader.OutOfBoundsException e2) {
                    th = new NoDataAvailableException();
                }
                if (!arrayList.isEmpty()) {
                    return new EventsReadAndTruncationLength(arrayList, j);
                }
                if (th != null) {
                    throw th;
                }
                throw new NoDataAvailableException();
            } catch (Exception e3) {
                throw e3;
            }
        }

        private CompletableFuture<EventsReadAndTruncationLength> applyProcessorHandler(EventsReadAndTruncationLength eventsReadAndTruncationLength) {
            return this.handler.apply(eventsReadAndTruncationLength.getProcessorEventsData()).thenApply(r3 -> {
                return eventsReadAndTruncationLength;
            });
        }

        private CompletableFuture<Void> truncateInternalSegment(EventsReadAndTruncationLength eventsReadAndTruncationLength, Timer timer) {
            return this.segment.truncate(this.segmentStartOffset.get() + eventsReadAndTruncationLength.getTruncationLength(), this.containerOperationTimeout).thenAccept(r7 -> {
                this.segmentStartOffset.addAndGet(eventsReadAndTruncationLength.getTruncationLength());
                this.metrics.batchProcessingLatency(timer.getElapsedMillis());
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/pravega/segmentstore/server/containers/ContainerEventProcessorImpl$ProcessorEventData.class */
    public static class ProcessorEventData {
        public static final int MAX_EVENT_SIZE = 1048576;
        private static final ProcessorEventDataSerializer SERIALIZER = new ProcessorEventDataSerializer();
        private final BufferView data;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:io/pravega/segmentstore/server/containers/ContainerEventProcessorImpl$ProcessorEventData$ProcessorEventDataBuilder.class */
        public static class ProcessorEventDataBuilder implements ObjectBuilder<ProcessorEventData> {

            @SuppressFBWarnings(justification = "generated code")
            @Generated
            private BufferView data;

            @SuppressFBWarnings(justification = "generated code")
            @Generated
            ProcessorEventDataBuilder() {
            }

            @SuppressFBWarnings(justification = "generated code")
            @Generated
            public ProcessorEventDataBuilder data(BufferView bufferView) {
                this.data = bufferView;
                return this;
            }

            @SuppressFBWarnings(justification = "generated code")
            @Generated
            /* renamed from: build, reason: merged with bridge method [inline-methods] */
            public ProcessorEventData m11build() {
                return new ProcessorEventData(this.data);
            }

            @SuppressFBWarnings(justification = "generated code")
            @Generated
            public String toString() {
                return "ContainerEventProcessorImpl.ProcessorEventData.ProcessorEventDataBuilder(data=" + this.data + ")";
            }
        }

        /* loaded from: input_file:io/pravega/segmentstore/server/containers/ContainerEventProcessorImpl$ProcessorEventData$ProcessorEventDataSerializer.class */
        static class ProcessorEventDataSerializer extends VersionedSerializer.WithBuilder<ProcessorEventData, ProcessorEventDataBuilder> {
            ProcessorEventDataSerializer() {
            }

            /* JADX INFO: Access modifiers changed from: protected */
            /* renamed from: newBuilder, reason: merged with bridge method [inline-methods] */
            public ProcessorEventDataBuilder m12newBuilder() {
                return ProcessorEventData.builder();
            }

            protected byte getWriteVersion() {
                return (byte) 0;
            }

            protected void declareVersions() {
                version(0).revision(0, this::write00, this::read00);
            }

            private void write00(ProcessorEventData processorEventData, RevisionDataOutput revisionDataOutput) throws IOException {
                revisionDataOutput.writeBuffer(processorEventData.getData());
            }

            private void read00(RevisionDataInput revisionDataInput, ProcessorEventDataBuilder processorEventDataBuilder) throws IOException {
                processorEventDataBuilder.data(new ByteArraySegment(revisionDataInput.readArray()));
            }
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        @ConstructorProperties({"data"})
        ProcessorEventData(BufferView bufferView) {
            this.data = bufferView;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public static ProcessorEventDataBuilder builder() {
            return new ProcessorEventDataBuilder();
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public BufferView getData() {
            return this.data;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof ProcessorEventData)) {
                return false;
            }
            ProcessorEventData processorEventData = (ProcessorEventData) obj;
            if (!processorEventData.canEqual(this)) {
                return false;
            }
            BufferView data = getData();
            BufferView data2 = processorEventData.getData();
            return data == null ? data2 == null : data.equals(data2);
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        protected boolean canEqual(Object obj) {
            return obj instanceof ProcessorEventData;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public int hashCode() {
            BufferView data = getData();
            return (1 * 59) + (data == null ? 43 : data.hashCode());
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String toString() {
            return "ContainerEventProcessorImpl.ProcessorEventData(data=" + getData() + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ContainerEventProcessorImpl(@NonNull SegmentContainer segmentContainer, @NonNull MetadataStore metadataStore, @NonNull Duration duration, @NonNull Duration duration2, @NonNull ScheduledExecutorService scheduledExecutorService) {
        this(segmentContainer.getId(), getOrCreateInternalSegment(segmentContainer, metadataStore, duration2), duration, duration2, scheduledExecutorService);
        if (segmentContainer == null) {
            throw new NullPointerException("container is marked non-null but is null");
        }
        if (metadataStore == null) {
            throw new NullPointerException("metadataStore is marked non-null but is null");
        }
        if (duration == null) {
            throw new NullPointerException("iterationDelay is marked non-null but is null");
        }
        if (duration2 == null) {
            throw new NullPointerException("containerOperationTimeout is marked non-null but is null");
        }
        if (scheduledExecutorService == null) {
            throw new NullPointerException("executor is marked non-null but is null");
        }
    }

    @VisibleForTesting
    ContainerEventProcessorImpl(int i, @NonNull Function<String, CompletableFuture<DirectSegmentAccess>> function, @NonNull Duration duration, @NonNull Duration duration2, @NonNull ScheduledExecutorService scheduledExecutorService) {
        this.eventProcessorMap = new ConcurrentHashMap();
        if (function == null) {
            throw new NullPointerException("segmentSupplier is marked non-null but is null");
        }
        if (duration == null) {
            throw new NullPointerException("iterationDelay is marked non-null but is null");
        }
        if (duration2 == null) {
            throw new NullPointerException("containerOperationTimeout is marked non-null but is null");
        }
        if (scheduledExecutorService == null) {
            throw new NullPointerException("executor is marked non-null but is null");
        }
        this.containerId = i;
        this.traceObjectId = String.format("ContainerEventProcessor[%d]", Integer.valueOf(i));
        this.segmentSupplier = function;
        this.iterationDelay = duration;
        this.containerOperationTimeout = duration2;
        this.closed = new AtomicBoolean(false);
        this.executor = scheduledExecutorService;
    }

    private static Function<String, CompletableFuture<DirectSegmentAccess>> getOrCreateInternalSegment(SegmentContainer segmentContainer, MetadataStore metadataStore, Duration duration) {
        return str -> {
            return Futures.exceptionallyComposeExpecting(segmentContainer.forSegment(NameUtils.getEventProcessorSegmentName(segmentContainer.getId(), str), duration), th -> {
                return th instanceof StreamSegmentNotExistsException;
            }, () -> {
                return metadataStore.registerPinnedSegment(NameUtils.getEventProcessorSegmentName(segmentContainer.getId(), str), SYSTEM_CRITICAL_SEGMENT, null, duration).thenCompose(l -> {
                    return segmentContainer.forSegment(NameUtils.getEventProcessorSegmentName(segmentContainer.getId(), str), duration);
                });
            });
        };
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.closed.getAndSet(true)) {
            return;
        }
        log.info("{}: Closing ContainerEventProcessor service.", this.traceObjectId);
        closeProcessors();
    }

    private void closeProcessors() {
        synchronized (this.eventProcessorMap) {
            if (this.eventProcessorMap.isEmpty()) {
                return;
            }
            this.eventProcessorMap.forEach((str, completableFuture) -> {
                try {
                    ((ContainerEventProcessor.EventProcessor) completableFuture.join()).close();
                } catch (Exception e) {
                    log.warn("{}: Problem closing EventProcessor {}.", new Object[]{this.traceObjectId, str, e});
                }
            });
            this.eventProcessorMap.clear();
            log.debug("{}: Closing EventProcessors complete.", this.traceObjectId);
        }
    }

    @Override // io.pravega.segmentstore.server.ContainerEventProcessor
    public CompletableFuture<ContainerEventProcessor.EventProcessor> forConsumer(@NonNull String str, @NonNull Function<List<BufferView>, CompletableFuture<Void>> function, @NonNull ContainerEventProcessor.EventProcessorConfig eventProcessorConfig) {
        CompletableFuture<ContainerEventProcessor.EventProcessor> completableFuture;
        if (str == null) {
            throw new NullPointerException("name is marked non-null but is null");
        }
        if (function == null) {
            throw new NullPointerException("handler is marked non-null but is null");
        }
        if (eventProcessorConfig == null) {
            throw new NullPointerException("config is marked non-null but is null");
        }
        checkEventProcessorCreatePreconditions(str);
        synchronized (this.eventProcessorMap) {
            CompletableFuture<ContainerEventProcessor.EventProcessor> completableFuture2 = this.eventProcessorMap.get(str);
            if (completableFuture2 == null) {
                Runnable runnable = () -> {
                    this.eventProcessorMap.remove(str);
                };
                completableFuture2 = new CompletableFuture<>();
                this.eventProcessorMap.put(str, completableFuture2);
                createEventProcessor(str, eventProcessorConfig, runnable, function, true, completableFuture2);
            }
            completableFuture = completableFuture2;
        }
        return completableFuture;
    }

    @Override // io.pravega.segmentstore.server.ContainerEventProcessor
    public CompletableFuture<ContainerEventProcessor.EventProcessor> forDurableQueue(@NonNull String str) {
        CompletableFuture<ContainerEventProcessor.EventProcessor> completableFuture;
        if (str == null) {
            throw new NullPointerException("name is marked non-null but is null");
        }
        checkEventProcessorCreatePreconditions(str);
        synchronized (this.eventProcessorMap) {
            CompletableFuture<ContainerEventProcessor.EventProcessor> completableFuture2 = this.eventProcessorMap.get(str);
            if (completableFuture2 == null) {
                Runnable runnable = () -> {
                    this.eventProcessorMap.remove(str);
                };
                Function<List<BufferView>, CompletableFuture<Void>> function = list -> {
                    return CompletableFuture.completedFuture(null);
                };
                ContainerEventProcessor.EventProcessorConfig eventProcessorConfig = new ContainerEventProcessor.EventProcessorConfig(0, Long.MAX_VALUE);
                completableFuture2 = new CompletableFuture<>();
                this.eventProcessorMap.put(str, completableFuture2);
                createEventProcessor(str, eventProcessorConfig, runnable, function, false, completableFuture2);
            }
            completableFuture = completableFuture2;
        }
        return completableFuture;
    }

    private void checkEventProcessorCreatePreconditions(String str) {
        Exceptions.checkNotClosed(this.closed.get(), this);
        Preconditions.checkArgument(!str.isEmpty(), "EventProcessor name cannot be empty.");
    }

    private CompletableFuture<ContainerEventProcessor.EventProcessor> createEventProcessor(String str, ContainerEventProcessor.EventProcessorConfig eventProcessorConfig, Runnable runnable, Function<List<BufferView>, CompletableFuture<Void>> function, boolean z, CompletableFuture<ContainerEventProcessor.EventProcessor> completableFuture) {
        return this.segmentSupplier.apply(str).thenApply(directSegmentAccess -> {
            Exceptions.checkNotClosed(this.closed.get(), this);
            EventProcessorImpl eventProcessorImpl = new EventProcessorImpl(str, this.containerId, directSegmentAccess, function, eventProcessorConfig, this.iterationDelay, this.containerOperationTimeout, runnable, this.executor);
            if (z) {
                eventProcessorImpl.startAsync().awaitRunning();
            }
            return eventProcessorImpl;
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (eventProcessor, th) -> {
            if (th == null) {
                log.info("{}: EventProcessor {} created successfully.", this.traceObjectId, str);
                completableFuture.complete(eventProcessor);
            } else {
                log.error("{}: Problem instantiating EventProcessor {}.", new Object[]{this.traceObjectId, str, th});
                completableFuture.completeExceptionally(th);
                this.eventProcessorMap.remove(str);
            }
        });
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    Map<String, CompletableFuture<ContainerEventProcessor.EventProcessor>> getEventProcessorMap() {
        return this.eventProcessorMap;
    }
}
