package tech.ydb.topic.read.impl;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.topic.description.OffsetsRange;
import tech.ydb.topic.read.DeferredCommitter;
import tech.ydb.topic.read.Message;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.read.impl.events.DataReceivedEventImpl;

/* loaded from: input_file:tech/ydb/topic/read/impl/DeferredCommitterImpl.class */
public class DeferredCommitterImpl implements DeferredCommitter {
    private static final Logger logger = LoggerFactory.getLogger(DeferredCommitterImpl.class);
    private final Map<PartitionSessionImpl, PartitionRanges> rangesByPartition = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:tech/ydb/topic/read/impl/DeferredCommitterImpl$PartitionRanges.class */
    public static class PartitionRanges {
        private final PartitionSessionImpl partitionSession;
        private final DisjointOffsetRangeSet ranges;
        private final ReentrantLock rangesLock;

        private PartitionRanges(PartitionSessionImpl partitionSessionImpl) {
            this.ranges = new DisjointOffsetRangeSet();
            this.rangesLock = new ReentrantLock();
            this.partitionSession = partitionSessionImpl;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void add(OffsetsRange offsetsRange) {
            try {
                this.rangesLock.lock();
                try {
                    this.ranges.add(offsetsRange);
                    this.rangesLock.unlock();
                } catch (Throwable th) {
                    this.rangesLock.unlock();
                    throw th;
                }
            } catch (RuntimeException e) {
                String str = "Error adding new offset range to DeferredCommitter for partition session " + this.partitionSession.getId() + " (partition " + this.partitionSession.getPartitionId() + "): " + e.getMessage();
                DeferredCommitterImpl.logger.error(str);
                throw new RuntimeException(str, e);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void commit() {
            this.rangesLock.lock();
            try {
                this.partitionSession.commitOffsetRanges(this.ranges.getRangesAndClear());
            } finally {
                this.rangesLock.unlock();
            }
        }
    }

    @Override // tech.ydb.topic.read.MessageAccumulator
    public void add(Message message) {
        MessageImpl messageImpl = (MessageImpl) message;
        this.rangesByPartition.computeIfAbsent(messageImpl.getPartitionSessionImpl(), partitionSessionImpl -> {
            return new PartitionRanges(partitionSessionImpl);
        }).add(messageImpl.getOffsetsToCommit());
    }

    @Override // tech.ydb.topic.read.MessageAccumulator
    public void add(DataReceivedEvent dataReceivedEvent) {
        DataReceivedEventImpl dataReceivedEventImpl = (DataReceivedEventImpl) dataReceivedEvent;
        this.rangesByPartition.computeIfAbsent(dataReceivedEventImpl.getPartitionSessionImpl(), partitionSessionImpl -> {
            return new PartitionRanges(partitionSessionImpl);
        }).add(dataReceivedEventImpl.getOffsetsToCommit());
    }

    @Override // tech.ydb.topic.read.DeferredCommitter
    public void commit() {
        this.rangesByPartition.forEach((partitionSessionImpl, partitionRanges) -> {
            partitionRanges.commit();
        });
    }
}
