package org.apache.spark.streaming.scheduler;

import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.internal.Logging;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.util.WriteAheadLog;
import org.apache.spark.util.Clock;
import org.apache.spark.util.Utils$;
import org.slf4j.Logger;
import scala.Function0;
import scala.Option;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.Queue;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.util.control.NonFatal$;

/* compiled from: ReceivedBlockTracker.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0015g!B\u0001\u0003\u0001\u0011a!\u0001\u0006*fG\u0016Lg/\u001a3CY>\u001c7\u000e\u0016:bG.,'O\u0003\u0002\u0004\t\u0005I1o\u00195fIVdWM\u001d\u0006\u0003\u000b\u0019\t\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005\u001dA\u0011!B:qCJ\\'BA\u0005\u000b\u0003\u0019\t\u0007/Y2iK*\t1\"A\u0002pe\u001e\u001c2\u0001A\u0007\u0014!\tq\u0011#D\u0001\u0010\u0015\u0005\u0001\u0012!B:dC2\f\u0017B\u0001\n\u0010\u0005\u0019\te.\u001f*fMB\u0011AcF\u0007\u0002+)\u0011aCB\u0001\tS:$XM\u001d8bY&\u0011\u0001$\u0006\u0002\b\u0019><w-\u001b8h\u0011!Q\u0002A!A!\u0002\u0013a\u0012\u0001B2p]\u001a\u001c\u0001\u0001\u0005\u0002\u001e=5\ta!\u0003\u0002 \r\tI1\u000b]1sW\u000e{gN\u001a\u0005\tC\u0001\u0011\t\u0011)A\u0005E\u0005Q\u0001.\u00193p_B\u001cuN\u001c4\u0011\u0005\r:S\"\u0001\u0013\u000b\u0005i)#B\u0001\u0014\t\u0003\u0019A\u0017\rZ8pa&\u0011\u0001\u0006\n\u0002\u000e\u0007>tg-[4ve\u0006$\u0018n\u001c8\t\u0011)\u0002!\u0011!Q\u0001\n-\n\u0011b\u001d;sK\u0006l\u0017\nZ:\u0011\u00071\"tG\u0004\u0002.e9\u0011a&M\u0007\u0002_)\u0011\u0001gG\u0001\u0007yI|w\u000e\u001e \n\u0003AI!aM\b\u0002\u000fA\f7m[1hK&\u0011QG\u000e\u0002\u0004'\u0016\f(BA\u001a\u0010!\tq\u0001(\u0003\u0002:\u001f\t\u0019\u0011J\u001c;\t\u0011m\u0002!\u0011!Q\u0001\nq\nQa\u00197pG.\u0004\"!\u0010!\u000e\u0003yR!a\u0010\u0004\u0002\tU$\u0018\u000e\\\u0005\u0003\u0003z\u0012Qa\u00117pG.D\u0001b\u0011\u0001\u0003\u0002\u0003\u0006I\u0001R\u0001\u0019e\u0016\u001cwN^3s\rJ|Wn\u0016:ji\u0016\f\u0005.Z1e\u0019><\u0007C\u0001\bF\u0013\t1uBA\u0004C_>dW-\u00198\t\u0011!\u0003!\u0011!Q\u0001\n%\u000b1c\u00195fG.\u0004x.\u001b8u\t&\u0014x\n\u001d;j_:\u00042A\u0004&M\u0013\tYuB\u0001\u0004PaRLwN\u001c\t\u0003\u001bBs!A\u0004(\n\u0005={\u0011A\u0002)sK\u0012,g-\u0003\u0002R%\n11\u000b\u001e:j]\u001eT!aT\b\t\u000bQ\u0003A\u0011A+\u0002\rqJg.\u001b;?)\u001d1\u0006,\u0017.\\9v\u0003\"a\u0016\u0001\u000e\u0003\tAQAG*A\u0002qAQ!I*A\u0002\tBQAK*A\u0002-BQaO*A\u0002qBQaQ*A\u0002\u0011CQ\u0001S*A\u0002%+Aa\u0018\u0001\u0005A\n\u0011\"+Z2fSZ,GM\u00117pG.\fV/Z;f!\r\tg\r[\u0007\u0002E*\u00111\rZ\u0001\b[V$\u0018M\u00197f\u0015\t)w\"\u0001\u0006d_2dWm\u0019;j_:L!a\u001a2\u0003\u000bE+X-^3\u0011\u0005]K\u0017B\u00016\u0003\u0005E\u0011VmY3jm\u0016$'\t\\8dW&sgm\u001c\u0005\bY\u0002\u0011\r\u0011\"\u0003n\u0003\u0001\u001aHO]3b[&#Gk\\+oC2dwnY1uK\u0012\u0014En\\2l#V,W/Z:\u0016\u00039\u0004B!Y88c&\u0011\u0001O\u0019\u0002\b\u0011\u0006\u001c\b.T1q!\t\u0011h,D\u0001\u0001\u0011\u0019!\b\u0001)A\u0005]\u0006\t3\u000f\u001e:fC6LE\rV8V]\u0006dGn\\2bi\u0016$'\t\\8dWF+X-^3tA!9a\u000f\u0001b\u0001\n\u00139\u0018!\u0006;j[\u0016$v.\u00117m_\u000e\fG/\u001a3CY>\u001c7n]\u000b\u0002qB!\u0011m\\=~!\tQ80D\u0001\u0005\u0013\taHA\u0001\u0003US6,\u0007CA,\u007f\u0013\ty(AA\bBY2|7-\u0019;fI\ncwnY6t\u0011\u001d\t\u0019\u0001\u0001Q\u0001\na\fa\u0003^5nKR{\u0017\t\u001c7pG\u0006$X\r\u001a\"m_\u000e\\7\u000f\t\u0005\n\u0003\u000f\u0001!\u0019!C\u0005\u0003\u0013\t1c\u001e:ji\u0016\f\u0005.Z1e\u0019><w\n\u001d;j_:,\"!a\u0003\u0011\t9Q\u0015Q\u0002\t\u0005\u0003\u001f\t\u0019\"\u0004\u0002\u0002\u0012)\u0011q\bB\u0005\u0005\u0003+\t\tBA\u0007Xe&$X-\u00115fC\u0012dun\u001a\u0005\t\u00033\u0001\u0001\u0015!\u0003\u0002\f\u0005!rO]5uK\u0006CW-\u00193M_\u001e|\u0005\u000f^5p]\u0002B\u0011\"!\b\u0001\u0001\u0004%I!a\b\u0002-1\f7\u000f^!mY>\u001c\u0017\r^3e\u0005\u0006$8\r\u001b+j[\u0016,\u0012!\u001f\u0005\n\u0003G\u0001\u0001\u0019!C\u0005\u0003K\t!\u0004\\1ti\u0006cGn\\2bi\u0016$')\u0019;dQRKW.Z0%KF$B!a\n\u0002.A\u0019a\"!\u000b\n\u0007\u0005-rB\u0001\u0003V]&$\b\"CA\u0018\u0003C\t\t\u00111\u0001z\u0003\rAH%\r\u0005\b\u0003g\u0001\u0001\u0015)\u0003z\u0003]a\u0017m\u001d;BY2|7-\u0019;fI\n\u000bGo\u00195US6,\u0007\u0005C\u0004\u00028\u0001!\t!!\u000f\u0002\u0011\u0005$GM\u00117pG.$2\u0001RA\u001e\u0011\u001d\ti$!\u000eA\u0002!\f\u0011C]3dK&4X\r\u001a\"m_\u000e\\\u0017J\u001c4p\u0011\u001d\t\t\u0005\u0001C\u0001\u0003\u0007\nQ#\u00197m_\u000e\fG/\u001a\"m_\u000e\\7\u000fV8CCR\u001c\u0007\u000e\u0006\u0003\u0002(\u0005\u0015\u0003bBA$\u0003\u007f\u0001\r!_\u0001\nE\u0006$8\r\u001b+j[\u0016Dq!a\u0013\u0001\t\u0003\ti%\u0001\thKR\u0014En\\2lg>3')\u0019;dQR!\u0011qJA,!\u0019i\u0015\u0011K\u001c\u0002V%\u0019\u00111\u000b*\u0003\u00075\u000b\u0007\u000fE\u0002-i!Dq!a\u0012\u0002J\u0001\u0007\u0011\u0010C\u0004\u0002\\\u0001!\t!!\u0018\u00023\u001d,GO\u00117pG.\u001cxJ\u001a\"bi\u000eD\u0017I\u001c3TiJ,\u0017-\u001c\u000b\u0007\u0003+\ny&!\u0019\t\u000f\u0005\u001d\u0013\u0011\fa\u0001s\"9\u00111MA-\u0001\u00049\u0014\u0001C:ue\u0016\fW.\u00133\t\u000f\u0005\u001d\u0004\u0001\"\u0001\u0002j\u0005a\u0002.Y:V]\u0006dGn\\2bi\u0016$'+Z2fSZ,GM\u00117pG.\u001cX#\u0001#\t\u000f\u00055\u0004\u0001\"\u0001\u0002p\u0005!r-\u001a;V]\u0006dGn\\2bi\u0016$'\t\\8dWN$B!!\u0016\u0002r!9\u00111MA6\u0001\u00049\u0004bBA;\u0001\u0011\u0005\u0011qO\u0001\u0012G2,\u0017M\\;q\u001f2$')\u0019;dQ\u0016\u001cHCBA\u0014\u0003s\ni\bC\u0004\u0002|\u0005M\u0004\u0019A=\u0002#\rdW-\u00198vaRC'/Z:i)&lW\rC\u0004\u0002��\u0005M\u0004\u0019\u0001#\u0002#]\f\u0017\u000e\u001e$pe\u000e{W\u000e\u001d7fi&|g\u000eC\u0004\u0002\u0004\u0002!\t!!\"\u0002\tM$x\u000e\u001d\u000b\u0003\u0003OAq!!#\u0001\t\u0013\t))A\tsK\u000e|g/\u001a:QCN$XI^3oiNDq!!$\u0001\t\u0013\ty)\u0001\u0006xe&$X\rV8M_\u001e$2\u0001RAI\u0011!\t\u0019*a#A\u0002\u0005U\u0015A\u0002:fG>\u0014H\rE\u0002X\u0003/K1!!'\u0003\u0005q\u0011VmY3jm\u0016$'\t\\8dWR\u0013\u0018mY6fe2{w-\u0012<f]RDq!!(\u0001\t\u0013\ty*A\u000bhKR\u0014VmY3jm\u0016$'\t\\8dWF+X-^3\u0015\u0007E\f\t\u000bC\u0004\u0002d\u0005m\u0005\u0019A\u001c\t\u000f\u0005\u0015\u0006\u0001\"\u0003\u0002(\u0006\u00192M]3bi\u0016<&/\u001b;f\u0003\",\u0017\r\u001a'pOR\u0011\u00111\u0002\u0005\t\u0003W\u0003A\u0011\u0001\u0003\u0002j\u00051\u0012n],sSR,\u0017\t[3bI2{w-\u00128bE2,Gm\u0002\u0005\u00020\nA\t\u0001BAY\u0003Q\u0011VmY3jm\u0016$'\t\\8dWR\u0013\u0018mY6feB\u0019q+a-\u0007\u000f\u0005\u0011\u0001\u0012\u0001\u0003\u00026N\u0019\u00111W\u0007\t\u000fQ\u000b\u0019\f\"\u0001\u0002:R\u0011\u0011\u0011\u0017\u0005\t\u0003{\u000b\u0019\f\"\u0001\u0002@\u0006)2\r[3dWB|\u0017N\u001c;ESJ$v\u000eT8h\t&\u0014Hc\u0001'\u0002B\"9\u00111YA^\u0001\u0004a\u0015!D2iK\u000e\\\u0007o\\5oi\u0012K'\u000f")
/* loaded from: input_file:org/apache/spark/streaming/scheduler/ReceivedBlockTracker.class */
public class ReceivedBlockTracker implements Logging {
    public final SparkConf org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$conf;
    public final Configuration org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$hadoopConf;
    private final Seq<Object> streamIds;
    private final Clock clock;
    public final Option<String> org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$checkpointDirOption;
    private final HashMap<Object, Queue<ReceivedBlockInfo>> streamIdToUnallocatedBlockQueues;
    private final HashMap<Time, AllocatedBlocks> timeToAllocatedBlocks;
    private final Option<WriteAheadLog> writeAheadLogOption;
    private Time lastAllocatedBatchTime;
    private transient Logger org$apache$spark$internal$Logging$$log_;
    private transient int org$apache$spark$internal$Logging$$levelFlags;

    public static String checkpointDirToLogDir(String str) {
        return ReceivedBlockTracker$.MODULE$.checkpointDirToLogDir(str);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public int org$apache$spark$internal$Logging$$levelFlags() {
        return this.org$apache$spark$internal$Logging$$levelFlags;
    }

    public void org$apache$spark$internal$Logging$$levelFlags_$eq(int i) {
        this.org$apache$spark$internal$Logging$$levelFlags = i;
    }

    public String logName() {
        return Logging.class.logName(this);
    }

    public Logger log() {
        return Logging.class.log(this);
    }

    public final boolean isInfoEnabled() {
        return Logging.class.isInfoEnabled(this);
    }

    public final boolean isDebugEnabled() {
        return Logging.class.isDebugEnabled(this);
    }

    public final boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.class.initializeLogIfNecessary(this, z);
    }

    private HashMap<Object, Queue<ReceivedBlockInfo>> streamIdToUnallocatedBlockQueues() {
        return this.streamIdToUnallocatedBlockQueues;
    }

    private HashMap<Time, AllocatedBlocks> timeToAllocatedBlocks() {
        return this.timeToAllocatedBlocks;
    }

    private Option<WriteAheadLog> writeAheadLogOption() {
        return this.writeAheadLogOption;
    }

    private Time lastAllocatedBatchTime() {
        return this.lastAllocatedBatchTime;
    }

    private void lastAllocatedBatchTime_$eq(Time time) {
        this.lastAllocatedBatchTime = time;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v17 */
    /* JADX WARN: Type inference failed for: r0v18, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v22 */
    public boolean addBlock(ReceivedBlockInfo receivedBlockInfo) {
        try {
            boolean writeToLog = writeToLog(new BlockAdditionEvent(receivedBlockInfo));
            if (writeToLog) {
                ?? r0 = this;
                synchronized (r0) {
                    org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(receivedBlockInfo.streamId()).$plus$eq(receivedBlockInfo);
                    r0 = r0;
                    logDebug(new ReceivedBlockTracker$$anonfun$addBlock$1(this, receivedBlockInfo));
                }
            } else {
                logDebug(new ReceivedBlockTracker$$anonfun$addBlock$2(this, receivedBlockInfo));
            }
            return writeToLog;
        } catch (Throwable th) {
            Option unapply = NonFatal$.MODULE$.unapply(th);
            if (unapply.isEmpty()) {
                throw th;
            }
            logError(new ReceivedBlockTracker$$anonfun$addBlock$3(this, receivedBlockInfo), (Throwable) unapply.get());
            return false;
        }
    }

    public synchronized void allocateBlocksToBatch(Time time) {
        if (lastAllocatedBatchTime() != null && !time.$greater(lastAllocatedBatchTime())) {
            logInfo(new ReceivedBlockTracker$$anonfun$allocateBlocksToBatch$2(this, time));
            return;
        }
        AllocatedBlocks allocatedBlocks = new AllocatedBlocks(((TraversableOnce) this.streamIds.map(new ReceivedBlockTracker$$anonfun$1(this), Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
        if (!writeToLog(new BatchAllocationEvent(time, allocatedBlocks))) {
            logInfo(new ReceivedBlockTracker$$anonfun$allocateBlocksToBatch$1(this, time));
        } else {
            timeToAllocatedBlocks().put(time, allocatedBlocks);
            lastAllocatedBatchTime_$eq(time);
        }
    }

    public synchronized Map<Object, Seq<ReceivedBlockInfo>> getBlocksOfBatch(Time time) {
        return (Map) timeToAllocatedBlocks().get(time).map(new ReceivedBlockTracker$$anonfun$getBlocksOfBatch$1(this)).getOrElse(new ReceivedBlockTracker$$anonfun$getBlocksOfBatch$2(this));
    }

    public synchronized Seq<ReceivedBlockInfo> getBlocksOfBatchAndStream(Time time, int i) {
        return (Seq) timeToAllocatedBlocks().get(time).map(new ReceivedBlockTracker$$anonfun$getBlocksOfBatchAndStream$1(this, i)).getOrElse(new ReceivedBlockTracker$$anonfun$getBlocksOfBatchAndStream$2(this));
    }

    public synchronized boolean hasUnallocatedReceivedBlocks() {
        return !streamIdToUnallocatedBlockQueues().values().forall(new ReceivedBlockTracker$$anonfun$hasUnallocatedReceivedBlocks$1(this));
    }

    public synchronized Seq<ReceivedBlockInfo> getUnallocatedBlocks(int i) {
        return org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(i).toSeq();
    }

    public synchronized void cleanupOldBatches(Time time, boolean z) {
        Predef$.MODULE$.require(time.milliseconds() < this.clock.getTimeMillis());
        Seq seq = ((TraversableOnce) timeToAllocatedBlocks().keys().filter(new ReceivedBlockTracker$$anonfun$2(this, time))).toSeq();
        logInfo(new ReceivedBlockTracker$$anonfun$cleanupOldBatches$1(this, seq));
        if (!writeToLog(new BatchCleanupEvent(seq))) {
            logWarning(new ReceivedBlockTracker$$anonfun$cleanupOldBatches$3(this));
        } else {
            timeToAllocatedBlocks().$minus$minus$eq(seq);
            writeAheadLogOption().foreach(new ReceivedBlockTracker$$anonfun$cleanupOldBatches$2(this, time, z));
        }
    }

    public void stop() {
        writeAheadLogOption().foreach(new ReceivedBlockTracker$$anonfun$stop$1(this));
    }

    private synchronized void recoverPastEvents() {
        writeAheadLogOption().foreach(new ReceivedBlockTracker$$anonfun$recoverPastEvents$1(this));
    }

    private boolean writeToLog(ReceivedBlockTrackerLogEvent receivedBlockTrackerLogEvent) {
        if (!isWriteAheadLogEnabled()) {
            return true;
        }
        logTrace(new ReceivedBlockTracker$$anonfun$writeToLog$1(this, receivedBlockTrackerLogEvent));
        try {
            ((WriteAheadLog) writeAheadLogOption().get()).write(ByteBuffer.wrap(Utils$.MODULE$.serialize(receivedBlockTrackerLogEvent)), this.clock.getTimeMillis());
            return true;
        } catch (Throwable th) {
            Option unapply = NonFatal$.MODULE$.unapply(th);
            if (unapply.isEmpty()) {
                throw th;
            }
            logWarning(new ReceivedBlockTracker$$anonfun$writeToLog$2(this, receivedBlockTrackerLogEvent), (Throwable) unapply.get());
            return false;
        }
    }

    public Queue<ReceivedBlockInfo> org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(int i) {
        return (Queue) streamIdToUnallocatedBlockQueues().getOrElseUpdate(BoxesRunTime.boxToInteger(i), new ReceivedBlockTracker$$anonfun$org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue$1(this));
    }

    private Option<WriteAheadLog> createWriteAheadLog() {
        return this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$checkpointDirOption.map(new ReceivedBlockTracker$$anonfun$createWriteAheadLog$1(this));
    }

    public boolean isWriteAheadLogEnabled() {
        return writeAheadLogOption().nonEmpty();
    }

    public final void org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$insertAddedBlock$1(ReceivedBlockInfo receivedBlockInfo) {
        logTrace(new ReceivedBlockTracker$$anonfun$org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$insertAddedBlock$1$1(this, receivedBlockInfo));
        receivedBlockInfo.setBlockIdInvalid();
        org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(receivedBlockInfo.streamId()).$plus$eq(receivedBlockInfo);
    }

    public final void org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$insertAllocatedBatch$1(Time time, AllocatedBlocks allocatedBlocks) {
        logTrace(new ReceivedBlockTracker$$anonfun$org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$insertAllocatedBatch$1$1(this, time, allocatedBlocks));
        streamIdToUnallocatedBlockQueues().values().foreach(new ReceivedBlockTracker$$anonfun$org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$insertAllocatedBatch$1$2(this));
        timeToAllocatedBlocks().put(time, allocatedBlocks);
        lastAllocatedBatchTime_$eq(time);
    }

    public final void org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$cleanupBatches$1(Seq seq) {
        logTrace(new ReceivedBlockTracker$$anonfun$org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$cleanupBatches$1$1(this, seq));
        timeToAllocatedBlocks().$minus$minus$eq(seq);
    }

    public ReceivedBlockTracker(SparkConf sparkConf, Configuration configuration, Seq<Object> seq, Clock clock, boolean z, Option<String> option) {
        this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$conf = sparkConf;
        this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$hadoopConf = configuration;
        this.streamIds = seq;
        this.clock = clock;
        this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$checkpointDirOption = option;
        Logging.class.$init$(this);
        this.streamIdToUnallocatedBlockQueues = new HashMap<>();
        this.timeToAllocatedBlocks = new HashMap<>();
        this.writeAheadLogOption = createWriteAheadLog();
        this.lastAllocatedBatchTime = null;
        if (z) {
            recoverPastEvents();
        }
    }
}
