package io.pravega.segmentstore.server.containers;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.BufferView;
import io.pravega.segmentstore.server.ContainerEventProcessor;
import io.pravega.segmentstore.storage.chunklayer.AbstractTaskQueueManager;
import io.pravega.segmentstore.storage.chunklayer.GarbageCollector;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/segmentstore/server/containers/StorageEventProcessor.class */
public class StorageEventProcessor implements AbstractTaskQueueManager<GarbageCollector.TaskInfo> {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(StorageEventProcessor.class);
    private static final GarbageCollector.TaskInfo.Serializer SERIALIZER = new GarbageCollector.TaskInfo.Serializer();
    private static final long CONTAINER_EVENT_PROCESSOR_TRUNCATE_DATA_SIZE = 131072;
    private final int containerID;
    private final ContainerEventProcessor eventProcessor;
    private final Function<List<GarbageCollector.TaskInfo>, CompletableFuture<Void>> callBack;
    private final int maxItemsAtOnce;
    private final String traceObjectId;
    private final ConcurrentHashMap<String, ContainerEventProcessor.EventProcessor> eventProcessorMap = new ConcurrentHashMap<>();

    public StorageEventProcessor(int i, ContainerEventProcessor containerEventProcessor, Function<List<GarbageCollector.TaskInfo>, CompletableFuture<Void>> function, int i2) {
        this.containerID = i;
        this.eventProcessor = (ContainerEventProcessor) Preconditions.checkNotNull(containerEventProcessor, "eventProcessor");
        this.callBack = (Function) Preconditions.checkNotNull(function, "callBack");
        this.maxItemsAtOnce = i2;
        this.traceObjectId = String.format("StorageEventProcessor[%d]", Integer.valueOf(i));
    }

    public CompletableFuture<Void> addQueue(String str, Boolean bool) {
        Preconditions.checkNotNull(str, "queueName");
        return (bool.booleanValue() ? this.eventProcessor.forDurableQueue(str) : this.eventProcessor.forConsumer(str, this::processEvents, new ContainerEventProcessor.EventProcessorConfig(this.maxItemsAtOnce, Long.MAX_VALUE, CONTAINER_EVENT_PROCESSOR_TRUNCATE_DATA_SIZE))).thenAccept(eventProcessor -> {
            this.eventProcessorMap.put(str, eventProcessor);
        });
    }

    public CompletableFuture<Void> addTask(String str, GarbageCollector.TaskInfo taskInfo) {
        Preconditions.checkNotNull(str, "queueName");
        Preconditions.checkNotNull(taskInfo, "task");
        try {
            ContainerEventProcessor.EventProcessor eventProcessor = this.eventProcessorMap.get(str);
            Preconditions.checkArgument(null != eventProcessor, "Attempt to add to non existent queue (%s).", str);
            return Futures.toVoid(eventProcessor.add(SERIALIZER.serialize(taskInfo), Duration.ofMillis(1000L)));
        } catch (Throwable th) {
            return CompletableFuture.failedFuture(th);
        }
    }

    public void close() throws Exception {
        for (Map.Entry<String, ContainerEventProcessor.EventProcessor> entry : this.eventProcessorMap.entrySet()) {
            try {
                entry.getValue().close();
            } catch (Exception e) {
                log.error("{}: Error while closing event processor name={}.", new Object[]{this.traceObjectId, entry.getKey(), e});
            }
        }
    }

    CompletableFuture<Void> processEvents(List<BufferView> list) {
        Preconditions.checkNotNull(list, "events");
        log.debug("{}: processEvents called with {} events", this.traceObjectId, Integer.valueOf(list.size()));
        ArrayList arrayList = new ArrayList();
        Iterator<BufferView> it = list.iterator();
        while (it.hasNext()) {
            try {
                arrayList.add((GarbageCollector.TaskInfo) SERIALIZER.deserialize(it.next()));
            } catch (IOException e) {
                log.error("{}: processEvents failed while deserializing batch.", this.traceObjectId, e);
                return CompletableFuture.failedFuture(e);
            }
        }
        return (CompletableFuture) this.callBack.apply(arrayList);
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public ConcurrentHashMap<String, ContainerEventProcessor.EventProcessor> getEventProcessorMap() {
        return this.eventProcessorMap;
    }
}
