package org.opendaylight.controller.akka.segjournal;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.AtomicWrite;
import akka.persistence.PersistentRepr;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.MoreObjects;
import com.google.common.base.Stopwatch;
import com.google.common.base.Verify;
import io.atomix.storage.journal.Indexed;
import io.atomix.storage.journal.JournalSerdes;
import io.atomix.storage.journal.JournalWriter;
import io.atomix.storage.journal.SegmentedJournal;
import io.atomix.storage.journal.StorageLevel;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.opendaylight.controller.cluster.reporting.MetricsReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
import scala.concurrent.Promise;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/opendaylight/controller/akka/segjournal/SegmentedJournalActor.class */
public abstract class SegmentedJournalActor extends AbstractActor {
    private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournalActor.class);
    private static final JournalSerdes DELETE_NAMESPACE = JournalSerdes.builder().register(new LongSerdes(), new Class[]{Long.class}).build();
    private static final int DELETE_SEGMENT_SIZE = 65536;
    private final String persistenceId;
    private final StorageLevel storage;
    private final int maxSegmentSize;
    private final int maxEntrySize;
    private final File directory;
    private Timer batchWriteTime;
    private Meter messageWriteCount;
    private Histogram messageSize;
    private Histogram flushMessages;
    private Histogram flushBytes;
    private Timer flushTime;
    private DataJournal dataJournal;
    private SegmentedJournal<Long> deleteJournal;
    private long lastDelete;

    /* loaded from: input_file:org/opendaylight/controller/akka/segjournal/SegmentedJournalActor$AsyncMessage.class */
    static abstract class AsyncMessage<T> {
        final Promise<T> promise = Promise.apply();

        AsyncMessage() {
        }
    }

    /* loaded from: input_file:org/opendaylight/controller/akka/segjournal/SegmentedJournalActor$Delayed.class */
    private static final class Delayed extends SegmentedJournalActor {
        private final ArrayDeque<WrittenMessages> unflushedWrites;
        private final Stopwatch unflushedDuration;
        private final long maxUnflushedBytes;
        private long batch;
        private long unflushedBytes;

        /* loaded from: input_file:org/opendaylight/controller/akka/segjournal/SegmentedJournalActor$Delayed$Flush.class */
        private static final class Flush extends AsyncMessage<Void> {
            final long batch;

            Flush(long j) {
                this.batch = j;
            }
        }

        Delayed(String str, File file, StorageLevel storageLevel, int i, int i2, int i3) {
            super(str, file, storageLevel, i, i2);
            this.unflushedWrites = new ArrayDeque<>();
            this.unflushedDuration = Stopwatch.createUnstarted();
            this.batch = 0L;
            this.unflushedBytes = 0L;
            this.maxUnflushedBytes = i3;
        }

        @Override // org.opendaylight.controller.akka.segjournal.SegmentedJournalActor
        ReceiveBuilder addMessages(ReceiveBuilder receiveBuilder) {
            return super.addMessages(receiveBuilder).match(Flush.class, this::handleFlush);
        }

        private void handleFlush(Flush flush) {
            if (flush.batch == this.batch) {
                flushWrites();
            } else {
                SegmentedJournalActor.LOG.debug("{}: batch {} not flushed by {}", new Object[]{persistenceId(), Long.valueOf(this.batch), Long.valueOf(flush.batch)});
            }
        }

        @Override // org.opendaylight.controller.akka.segjournal.SegmentedJournalActor
        void onWrittenMessages(WrittenMessages writtenMessages) {
            boolean isEmpty = this.unflushedWrites.isEmpty();
            if (isEmpty) {
                this.unflushedDuration.start();
            }
            this.unflushedWrites.addLast(writtenMessages);
            this.unflushedBytes += writtenMessages.writtenBytes;
            if (this.unflushedBytes >= this.maxUnflushedBytes) {
                SegmentedJournalActor.LOG.debug("{}: reached {} unflushed journal bytes", persistenceId(), Long.valueOf(this.unflushedBytes));
                flushWrites();
            } else if (isEmpty) {
                SegmentedJournalActor.LOG.debug("{}: deferring journal flush", persistenceId());
                ActorRef self = self();
                long j = this.batch + 1;
                this.batch = j;
                self.tell(new Flush(j), ActorRef.noSender());
            }
        }

        @Override // org.opendaylight.controller.akka.segjournal.SegmentedJournalActor
        void flushWrites() {
            int size = this.unflushedWrites.size();
            if (size == 0) {
                return;
            }
            SegmentedJournalActor.LOG.debug("{}: flushing {} journal writes after {}", new Object[]{persistenceId(), Integer.valueOf(size), this.unflushedDuration.stop()});
            flushJournal(this.unflushedBytes, size);
            Stopwatch createStarted = Stopwatch.createStarted();
            this.unflushedWrites.forEach((v0) -> {
                v0.complete();
            });
            this.unflushedWrites.clear();
            this.unflushedBytes = 0L;
            this.unflushedDuration.reset();
            SegmentedJournalActor.LOG.debug("{}: completed {} flushed journal writes in {}", new Object[]{persistenceId(), Integer.valueOf(size), createStarted});
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/opendaylight/controller/akka/segjournal/SegmentedJournalActor$DeleteMessagesTo.class */
    public static final class DeleteMessagesTo extends AsyncMessage<Void> {
        final long toSequenceNr;

        DeleteMessagesTo(long j) {
            this.toSequenceNr = j;
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("toSequenceNr", this.toSequenceNr).toString();
        }
    }

    /* loaded from: input_file:org/opendaylight/controller/akka/segjournal/SegmentedJournalActor$Immediate.class */
    private static final class Immediate extends SegmentedJournalActor {
        Immediate(String str, File file, StorageLevel storageLevel, int i, int i2) {
            super(str, file, storageLevel, i, i2);
        }

        @Override // org.opendaylight.controller.akka.segjournal.SegmentedJournalActor
        void onWrittenMessages(WrittenMessages writtenMessages) {
            flushJournal(writtenMessages.writtenBytes, 1);
            writtenMessages.complete();
        }

        @Override // org.opendaylight.controller.akka.segjournal.SegmentedJournalActor
        void flushWrites() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/opendaylight/controller/akka/segjournal/SegmentedJournalActor$ReadHighestSequenceNr.class */
    public static final class ReadHighestSequenceNr extends AsyncMessage<Long> {
        private final long fromSequenceNr;

        ReadHighestSequenceNr(long j) {
            this.fromSequenceNr = j;
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("fromSequenceNr", this.fromSequenceNr).toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/opendaylight/controller/akka/segjournal/SegmentedJournalActor$ReplayMessages.class */
    public static final class ReplayMessages extends AsyncMessage<Void> {
        private final long fromSequenceNr;
        final long toSequenceNr;
        final long max;
        final Consumer<PersistentRepr> replayCallback;

        ReplayMessages(long j, long j2, long j3, Consumer<PersistentRepr> consumer) {
            this.fromSequenceNr = j;
            this.toSequenceNr = j2;
            this.max = j3;
            this.replayCallback = (Consumer) Objects.requireNonNull(consumer);
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("fromSequenceNr", this.fromSequenceNr).add("toSequenceNr", this.toSequenceNr).add("max", this.max).toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/opendaylight/controller/akka/segjournal/SegmentedJournalActor$WriteMessages.class */
    public static final class WriteMessages {
        private final List<AtomicWrite> requests = new ArrayList();
        private final List<Promise<Optional<Exception>>> results = new ArrayList();

        /* JADX INFO: Access modifiers changed from: package-private */
        public Future<Optional<Exception>> add(AtomicWrite atomicWrite) {
            Promise<Optional<Exception>> apply = Promise.apply();
            this.requests.add(atomicWrite);
            this.results.add(apply);
            return apply.future();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public int size() {
            return this.requests.size();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public AtomicWrite getRequest(int i) {
            return this.requests.get(i);
        }

        void setFailure(int i, Exception exc) {
            this.results.get(i).success(Optional.of(exc));
        }

        void setSuccess(int i) {
            this.results.get(i).success(Optional.empty());
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("requests", this.requests).toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/opendaylight/controller/akka/segjournal/SegmentedJournalActor$WrittenMessages.class */
    public static final class WrittenMessages extends Record {
        private final WriteMessages message;
        private final List<Object> responses;
        private final long writtenBytes;

        /* JADX INFO: Access modifiers changed from: package-private */
        public WrittenMessages(WriteMessages writeMessages, List<Object> list, long j) {
            Verify.verify(list.size() == writeMessages.size(), "Mismatched %s and %s", writeMessages, list);
            Verify.verify(j >= 0, "Unexpected length %s", j);
            this.message = writeMessages;
            this.responses = list;
            this.writtenBytes = j;
        }

        private void complete() {
            int size = this.responses.size();
            for (int i = 0; i < size; i++) {
                Object obj = this.responses.get(i);
                if (obj instanceof Exception) {
                    this.message.setFailure(i, (Exception) obj);
                } else {
                    this.message.setSuccess(i);
                }
            }
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, WrittenMessages.class), WrittenMessages.class, "message;responses;writtenBytes", "FIELD:Lorg/opendaylight/controller/akka/segjournal/SegmentedJournalActor$WrittenMessages;->message:Lorg/opendaylight/controller/akka/segjournal/SegmentedJournalActor$WriteMessages;", "FIELD:Lorg/opendaylight/controller/akka/segjournal/SegmentedJournalActor$WrittenMessages;->responses:Ljava/util/List;", "FIELD:Lorg/opendaylight/controller/akka/segjournal/SegmentedJournalActor$WrittenMessages;->writtenBytes:J").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, WrittenMessages.class), WrittenMessages.class, "message;responses;writtenBytes", "FIELD:Lorg/opendaylight/controller/akka/segjournal/SegmentedJournalActor$WrittenMessages;->message:Lorg/opendaylight/controller/akka/segjournal/SegmentedJournalActor$WriteMessages;", "FIELD:Lorg/opendaylight/controller/akka/segjournal/SegmentedJournalActor$WrittenMessages;->responses:Ljava/util/List;", "FIELD:Lorg/opendaylight/controller/akka/segjournal/SegmentedJournalActor$WrittenMessages;->writtenBytes:J").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, WrittenMessages.class, Object.class), WrittenMessages.class, "message;responses;writtenBytes", "FIELD:Lorg/opendaylight/controller/akka/segjournal/SegmentedJournalActor$WrittenMessages;->message:Lorg/opendaylight/controller/akka/segjournal/SegmentedJournalActor$WriteMessages;", "FIELD:Lorg/opendaylight/controller/akka/segjournal/SegmentedJournalActor$WrittenMessages;->responses:Ljava/util/List;", "FIELD:Lorg/opendaylight/controller/akka/segjournal/SegmentedJournalActor$WrittenMessages;->writtenBytes:J").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public WriteMessages message() {
            return this.message;
        }

        public List<Object> responses() {
            return this.responses;
        }

        public long writtenBytes() {
            return this.writtenBytes;
        }
    }

    private SegmentedJournalActor(String str, File file, StorageLevel storageLevel, int i, int i2) {
        this.persistenceId = (String) Objects.requireNonNull(str);
        this.directory = (File) Objects.requireNonNull(file);
        this.storage = (StorageLevel) Objects.requireNonNull(storageLevel);
        this.maxEntrySize = i;
        this.maxSegmentSize = i2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props(String str, File file, StorageLevel storageLevel, int i, int i2, int i3) {
        String str2 = (String) Objects.requireNonNull(str);
        return i3 > 0 ? Props.create(Delayed.class, new Object[]{str2, file, storageLevel, Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3)}) : Props.create(Immediate.class, new Object[]{str2, file, storageLevel, Integer.valueOf(i), Integer.valueOf(i2)});
    }

    final String persistenceId() {
        return this.persistenceId;
    }

    final void flushJournal(long j, int i) {
        Stopwatch createStarted = Stopwatch.createStarted();
        this.dataJournal.flush();
        LOG.debug("{}: journal flush completed in {}", this.persistenceId, createStarted.stop());
        this.flushBytes.update(j);
        this.flushMessages.update(i);
        this.flushTime.update(createStarted.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
    }

    public AbstractActor.Receive createReceive() {
        return addMessages(receiveBuilder()).matchAny(this::handleUnknown).build();
    }

    ReceiveBuilder addMessages(ReceiveBuilder receiveBuilder) {
        return receiveBuilder.match(DeleteMessagesTo.class, this::handleDeleteMessagesTo).match(ReadHighestSequenceNr.class, this::handleReadHighestSequenceNr).match(ReplayMessages.class, this::handleReplayMessages).match(WriteMessages.class, this::handleWriteMessages);
    }

    public void preStart() throws Exception {
        LOG.debug("{}: actor starting", this.persistenceId);
        super.preStart();
        MetricRegistry metricsRegistry = MetricsReporter.getInstance("org.opendaylight.controller.actor.metric").getMetricsRegistry();
        String str = self().path().parent().toStringWithoutAddress() + "/" + this.directory.getName();
        this.batchWriteTime = metricsRegistry.timer(MetricRegistry.name(str, new String[]{"batchWriteTime"}));
        this.messageWriteCount = metricsRegistry.meter(MetricRegistry.name(str, new String[]{"messageWriteCount"}));
        this.messageSize = metricsRegistry.histogram(MetricRegistry.name(str, new String[]{"messageSize"}));
        this.flushBytes = metricsRegistry.histogram(MetricRegistry.name(str, new String[]{"flushBytes"}));
        this.flushMessages = metricsRegistry.histogram(MetricRegistry.name(str, new String[]{"flushMessages"}));
        this.flushTime = metricsRegistry.timer(MetricRegistry.name(str, new String[]{"flushTime"}));
    }

    public void postStop() throws Exception {
        LOG.debug("{}: actor stopping", this.persistenceId);
        if (this.dataJournal != null) {
            this.dataJournal.close();
            LOG.debug("{}: data journal closed", this.persistenceId);
            this.dataJournal = null;
        }
        if (this.deleteJournal != null) {
            this.deleteJournal.close();
            LOG.debug("{}: delete journal closed", this.persistenceId);
            this.deleteJournal = null;
        }
        LOG.debug("{}: actor stopped", this.persistenceId);
        super.postStop();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static AsyncMessage<Void> deleteMessagesTo(long j) {
        return new DeleteMessagesTo(j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static AsyncMessage<Long> readHighestSequenceNr(long j) {
        return new ReadHighestSequenceNr(j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static AsyncMessage<Void> replayMessages(long j, long j2, long j3, Consumer<PersistentRepr> consumer) {
        return new ReplayMessages(j, j2, j3, consumer);
    }

    private void handleDeleteMessagesTo(DeleteMessagesTo deleteMessagesTo) {
        ensureOpen();
        LOG.debug("{}: delete messages {}", this.persistenceId, deleteMessagesTo);
        flushWrites();
        long min = Long.min(this.dataJournal.lastWrittenSequenceNr(), deleteMessagesTo.toSequenceNr);
        LOG.debug("{}: adjusted delete to {}", this.persistenceId, Long.valueOf(min));
        if (this.lastDelete < min) {
            LOG.debug("{}: deleting entries up to {}", this.persistenceId, Long.valueOf(min));
            this.lastDelete = min;
            JournalWriter writer = this.deleteJournal.writer();
            Indexed append = writer.append(Long.valueOf(this.lastDelete));
            writer.commit(append.index());
            this.dataJournal.deleteTo(this.lastDelete);
            LOG.debug("{}: compaction started", this.persistenceId);
            this.dataJournal.compactTo(this.lastDelete);
            this.deleteJournal.compact(append.index());
            LOG.debug("{}: compaction finished", this.persistenceId);
        } else {
            LOG.debug("{}: entries up to {} already deleted", this.persistenceId, Long.valueOf(this.lastDelete));
        }
        deleteMessagesTo.promise.success((Object) null);
    }

    private void handleReadHighestSequenceNr(ReadHighestSequenceNr readHighestSequenceNr) {
        Long l;
        LOG.debug("{}: looking for highest sequence on {}", this.persistenceId, readHighestSequenceNr);
        if (this.directory.isDirectory()) {
            ensureOpen();
            flushWrites();
            l = Long.valueOf(this.dataJournal.lastWrittenSequenceNr());
        } else {
            l = 0L;
        }
        LOG.debug("{}: highest sequence is {}", readHighestSequenceNr, l);
        readHighestSequenceNr.promise.success(l);
    }

    private void handleReplayMessages(ReplayMessages replayMessages) {
        LOG.debug("{}: replaying messages {}", this.persistenceId, replayMessages);
        ensureOpen();
        flushWrites();
        long max = Long.max(this.lastDelete + 1, replayMessages.fromSequenceNr);
        LOG.debug("{}: adjusted fromSequenceNr to {}", this.persistenceId, Long.valueOf(max));
        this.dataJournal.handleReplayMessages(replayMessages, max);
    }

    private void handleWriteMessages(WriteMessages writeMessages) {
        ensureOpen();
        Stopwatch createStarted = Stopwatch.createStarted();
        long lastWrittenSequenceNr = this.dataJournal.lastWrittenSequenceNr();
        WrittenMessages handleWriteMessages = this.dataJournal.handleWriteMessages(writeMessages);
        createStarted.stop();
        this.batchWriteTime.update(createStarted.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
        this.messageWriteCount.mark(this.dataJournal.lastWrittenSequenceNr() - lastWrittenSequenceNr);
        LOG.debug("{}: write of {} bytes completed in {}", new Object[]{this.persistenceId, Long.valueOf(handleWriteMessages.writtenBytes), createStarted});
        onWrittenMessages(handleWriteMessages);
    }

    abstract void onWrittenMessages(WrittenMessages writtenMessages);

    private void handleUnknown(Object obj) {
        LOG.error("{}: Received unknown message {}", this.persistenceId, obj);
    }

    private void ensureOpen() {
        if (this.dataJournal != null) {
            Verify.verifyNotNull(this.deleteJournal);
            return;
        }
        Stopwatch createStarted = Stopwatch.createStarted();
        this.deleteJournal = SegmentedJournal.builder().withDirectory(this.directory).withName("delete").withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build();
        Indexed lastEntry = this.deleteJournal.writer().getLastEntry();
        this.lastDelete = lastEntry == null ? 0L : ((Long) lastEntry.entry()).longValue();
        this.dataJournal = new DataJournalV0(this.persistenceId, this.messageSize, context().system(), this.storage, this.directory, this.maxEntrySize, this.maxSegmentSize);
        this.dataJournal.deleteTo(this.lastDelete);
        LOG.debug("{}: journal open in {} with last index {}, deleted to {}", new Object[]{this.persistenceId, createStarted, Long.valueOf(this.dataJournal.lastWrittenSequenceNr()), Long.valueOf(this.lastDelete)});
    }

    abstract void flushWrites();
}
