package kafka.server.epoch;

import com.typesafe.scalalogging.Logger;
import java.util.concurrent.Future;
import kafka.cluster.BrokerEndPoint;
import kafka.log.LogManager;
import kafka.log.LogSegment;
import kafka.log.UnifiedLog;
import kafka.server.BlockingSend;
import kafka.server.BrokerBlockingSender;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.QuorumTestHarness;
import kafka.utils.Implicits$;
import kafka.utils.Implicits$MapExtensionMethods$;
import kafka.utils.Logging;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.Function0;
import scala.Function2;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable$;
import scala.collection.Iterator;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.compat.MapExtensionMethods$;
import scala.collection.compat.package$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.ListBuffer;
import scala.collection.mutable.ListBuffer$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;

/* compiled from: LeaderEpochIntegrationTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005%h\u0001B\u0013'\u00015BQ\u0001\u000f\u0001\u0005\u0002eBq\u0001\u0010\u0001A\u0002\u0013\u0005Q\bC\u0004L\u0001\u0001\u0007I\u0011\u0001'\t\rM\u0003\u0001\u0015)\u0003?\u0011\u001d!\u0006A1A\u0005\u0002UCaA\u0018\u0001!\u0002\u00131\u0006bB0\u0001\u0005\u0004%\t!\u0016\u0005\u0007A\u0002\u0001\u000b\u0011\u0002,\t\u000f\u0005\u0004!\u0019!C\u0001E\"1a\u000e\u0001Q\u0001\n\rDqa\u001c\u0001C\u0002\u0013\u0005!\r\u0003\u0004q\u0001\u0001\u0006Ia\u0019\u0005\bc\u0002\u0011\r\u0011\"\u0001c\u0011\u0019\u0011\b\u0001)A\u0005G\"91\u000f\u0001b\u0001\n\u0003\u0011\u0007B\u0002;\u0001A\u0003%1\rC\u0004v\u0001\t\u0007I\u0011\u00012\t\rY\u0004\u0001\u0015!\u0003d\u0011\u001d9\bA1A\u0005\u0002\tDa\u0001\u001f\u0001!\u0002\u0013\u0019\u0007\"C=\u0001\u0001\u0004\u0005\r\u0011\"\u0001{\u0011-\t\t\u0002\u0001a\u0001\u0002\u0004%\t!a\u0005\t\u0015\u0005]\u0001\u00011A\u0001B\u0003&1\u0010C\u0004\u0002\u001a\u0001!\t%a\u0007\t\u000f\u0005M\u0002\u0001\"\u0001\u0002\u001c!9\u0011Q\b\u0001\u0005\u0002\u0005m\u0001bBA!\u0001\u0011\u0005\u00111\u0004\u0005\b\u0003\u000b\u0002A\u0011AA$\u0011\u001d\tY\u0006\u0001C\u0005\u0003WCq!!.\u0001\t\u0013\t9\fC\u0004\u0002R\u0002!I!a5\t\u000f\u0005\u001d\b\u0001\"\u0003\u0002\u001c\u00199\u0011\u0011\u000b\u0001\u0001M\u0005M\u0003BCA.C\t\u0005\t\u0015!\u0003\u0002^!1\u0001(\tC\u0001\u0003GBq!a\u001a\"\t\u0003\tIG\u0001\u000eMK\u0006$WM]#q_\u000eD\u0017J\u001c;fOJ\fG/[8o)\u0016\u001cHO\u0003\u0002(Q\u0005)Q\r]8dQ*\u0011\u0011FK\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003-\nQa[1gW\u0006\u001c\u0001aE\u0002\u0001]I\u0002\"a\f\u0019\u000e\u0003!J!!\r\u0015\u0003#E+xN];n)\u0016\u001cH\u000fS1s]\u0016\u001c8\u000f\u0005\u00024m5\tAG\u0003\u00026U\u0005)Q\u000f^5mg&\u0011q\u0007\u000e\u0002\b\u0019><w-\u001b8h\u0003\u0019a\u0014N\\5u}Q\t!\b\u0005\u0002<\u00015\ta%A\u0004ce>\\WM]:\u0016\u0003y\u00022a\u0010$I\u001b\u0005\u0001%BA!C\u0003\u001diW\u000f^1cY\u0016T!a\u0011#\u0002\u0015\r|G\u000e\\3di&|gNC\u0001F\u0003\u0015\u00198-\u00197b\u0013\t9\u0005I\u0001\u0006MSN$()\u001e4gKJ\u0004\"aL%\n\u0005)C#aC&bM.\f7+\u001a:wKJ\f1B\u0019:pW\u0016\u00148o\u0018\u0013fcR\u0011Q*\u0015\t\u0003\u001d>k\u0011\u0001R\u0005\u0003!\u0012\u0013A!\u00168ji\"9!kAA\u0001\u0002\u0004q\u0014a\u0001=%c\u0005A!M]8lKJ\u001c\b%\u0001\u0004u_BL7-M\u000b\u0002-B\u0011q\u000bX\u0007\u00021*\u0011\u0011LW\u0001\u0005Y\u0006twMC\u0001\\\u0003\u0011Q\u0017M^1\n\u0005uC&AB*ue&tw-A\u0004u_BL7-\r\u0011\u0002\rQ|\u0007/[23\u0003\u001d!x\u000e]5de\u0001\nA\u0001^\u0019qaU\t1\r\u0005\u0002eY6\tQM\u0003\u0002gO\u000611m\\7n_:T!a\u000b5\u000b\u0005%T\u0017AB1qC\u000eDWMC\u0001l\u0003\ry'oZ\u0005\u0003[\u0016\u0014a\u0002V8qS\u000e\u0004\u0016M\u001d;ji&|g.A\u0003ucA\u0004\u0004%\u0001\u0003ucA\f\u0014!\u0002;2aF\u0002\u0013\u0001\u0002;2aJ\nQ\u0001^\u0019qe\u0001\nA\u0001\u001e\u001aqa\u0005)AO\r91A\u0005!AO\r93\u0003\u0015!(\u0007\u001d\u001a!\u0003\t!\b/A\u0002ua\u0002\n\u0001\u0002\u001d:pIV\u001cWM]\u000b\u0002wB9A0!\u0001\u0002\u0006\u0005\u0015Q\"A?\u000b\u0005et(BA@h\u0003\u001d\u0019G.[3oiNL1!a\u0001~\u00055Y\u0015MZ6b!J|G-^2feB)a*a\u0002\u0002\f%\u0019\u0011\u0011\u0002#\u0003\u000b\u0005\u0013(/Y=\u0011\u00079\u000bi!C\u0002\u0002\u0010\u0011\u0013AAQ=uK\u0006a\u0001O]8ek\u000e,'o\u0018\u0013fcR\u0019Q*!\u0006\t\u000fI3\u0012\u0011!a\u0001w\u0006I\u0001O]8ek\u000e,'\u000fI\u0001\ti\u0016\f'\u000fR8x]R\tQ\nK\u0002\u0019\u0003?\u0001B!!\t\u000205\u0011\u00111\u0005\u0006\u0005\u0003K\t9#A\u0002ba&TA!!\u000b\u0002,\u00059!.\u001e9ji\u0016\u0014(bAA\u0017U\u0006)!.\u001e8ji&!\u0011\u0011GA\u0012\u0005%\te\r^3s\u000b\u0006\u001c\u0007.A\u001ftQ>,H\u000eZ!eI\u000e+(O]3oi2+\u0017\rZ3s\u000bB|7\r\u001b+p\u001b\u0016\u001c8/Y4fg\u0006\u001bH\u000b[3z\u0003J,wK]5ui\u0016tGk\u001c'fC\u0012,'\u000fK\u0002\u001a\u0003o\u0001B!!\t\u0002:%!\u00111HA\u0012\u0005\u0011!Vm\u001d;\u0002WMDw.\u001e7e'\u0016tG\rT3bI\u0016\u0014X\t]8dQJ+\u0017/^3ti\u0006sGmR3u\u0003J+7\u000f]8og\u0016D3AGA\u001c\u00039\u001a\bn\\;mI&s7M]3bg\u0016dU-\u00193fe\u0016\u0003xn\u00195CKR<X-\u001a8MK\u0006$WM\u001d*fgR\f'\u000f^:)\u0007m\t9$\u0001\u0017tQ>,H\u000eZ*vaB|'\u000f\u001e*fcV,7\u000f^:G_J,\u0005o\\2ig:{Go\u00148UQ\u0016dU-\u00193feR\u0019Q*!\u0013\t\u000f\u0005-C\u00041\u0001\u0002N\u00059a-\u001a;dQ\u0016\u0014\bcAA(C5\t\u0001AA\tUKN$h)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012\u001cB!IA+eA\u0019a*a\u0016\n\u0007\u0005eCI\u0001\u0004B]f\u0014VMZ\u0001\u0007g\u0016tG-\u001a:\u0011\u0007=\ny&C\u0002\u0002b!\u0012AB\u00117pG.LgnZ*f]\u0012$B!!\u0014\u0002f!9\u00111L\u0012A\u0002\u0005u\u0013\u0001\u00057fC\u0012,'o\u00144gg\u0016$8OR8s)\u0011\tY'a(\u0011\u000f\u00055\u0014qN2\u0002t5\t!)C\u0002\u0002r\t\u00131!T1q!\u0011\t)(!'\u000f\t\u0005]\u00141\u0013\b\u0005\u0003s\nyI\u0004\u0003\u0002|\u00055e\u0002BA?\u0003\u0017sA!a \u0002\n:!\u0011\u0011QAD\u001b\t\t\u0019IC\u0002\u0002\u00062\na\u0001\u0010:p_Rt\u0014\"A6\n\u0005%T\u0017BA\u0016i\u0013\t1w-C\u0002\u0002\u0012\u0016\fq!\\3tg\u0006<W-\u0003\u0003\u0002\u0016\u0006]\u0015\u0001I(gMN,GOR8s\u0019\u0016\fG-\u001a:Fa>\u001c\u0007NU3ta>t7/\u001a#bi\u0006T1!!%f\u0013\u0011\tY*!(\u0003\u001d\u0015\u0003xn\u00195F]\u0012|eMZ:fi*!\u0011QSAL\u0011\u001d\t\t\u000b\na\u0001\u0003G\u000b!\u0002]1si&$\u0018n\u001c8t!\u001d\ti'a\u001cd\u0003K\u00032ATAT\u0013\r\tI\u000b\u0012\u0002\u0004\u0013:$HCBA/\u0003[\u000b\t\f\u0003\u0004\u00020v\u0001\r\u0001S\u0001\u0005MJ|W\u000e\u0003\u0004\u00024v\u0001\r\u0001S\u0001\u0003i>\fAc^1ji\u001a{'/\u00129pG\"\u001c\u0005.\u00198hKR{GcB'\u0002:\u0006-\u0017q\u001a\u0005\b\u0003ws\u0002\u0019AA_\u0003\u0015!x\u000e]5d!\u0011\ty,a2\u000f\t\u0005\u0005\u00171\u0019\t\u0004\u0003\u0003#\u0015bAAc\t\u00061\u0001K]3eK\u001aL1!XAe\u0015\r\t)\r\u0012\u0005\b\u0003\u001bt\u0002\u0019AAS\u0003%\u0001\u0018M\u001d;ji&|g\u000e\u0003\u0004(=\u0001\u0007\u0011QU\u0001\u0018[\u0016\u001c8/Y4fg\"\u000bg/\u001a'fC\u0012,'/\u00129pG\"$\u0002\"!6\u0002\\\u0006}\u00171\u001d\t\u0004\u001d\u0006]\u0017bAAm\t\n9!i\\8mK\u0006t\u0007BBAo?\u0001\u0007\u0001*\u0001\u0004ce>\\WM\u001d\u0005\b\u0003C|\u0002\u0019AAS\u0003M)\u0007\u0010]3di\u0016$G*Z1eKJ,\u0005o\\2i\u0011\u001d\t)o\ba\u0001\u0003K\u000b\u0011\"\\5o\u001f\u001a47/\u001a;\u00027M,g\u000e\u001a$pkJlUm]:bO\u0016\u001cHk\\#bG\"$v\u000e]5d\u0001")
/* loaded from: input_file:kafka/server/epoch/LeaderEpochIntegrationTest.class */
public class LeaderEpochIntegrationTest extends QuorumTestHarness {
    private ListBuffer<KafkaServer> brokers = ListBuffer$.MODULE$.apply(Nil$.MODULE$);
    private final String topic1 = "foo";
    private final String topic2 = "bar";
    private final TopicPartition t1p0 = new TopicPartition(topic1(), 0);
    private final TopicPartition t1p1 = new TopicPartition(topic1(), 1);
    private final TopicPartition t1p2 = new TopicPartition(topic1(), 2);
    private final TopicPartition t2p0 = new TopicPartition(topic2(), 0);
    private final TopicPartition t2p2 = new TopicPartition(topic2(), 2);
    private final TopicPartition tp = t1p0();
    private KafkaProducer<byte[], byte[]> producer;

    /* compiled from: LeaderEpochIntegrationTest.scala */
    /* loaded from: input_file:kafka/server/epoch/LeaderEpochIntegrationTest$TestFetcherThread.class */
    public class TestFetcherThread implements Logging {
        private final BlockingSend sender;
        private Logger logger;
        private String logIdent;
        private volatile boolean bitmap$0;
        public final /* synthetic */ LeaderEpochIntegrationTest $outer;

        public String loggerName() {
            return Logging.loggerName$(this);
        }

        public String msgWithLogIdent(String str) {
            return Logging.msgWithLogIdent$(this, str);
        }

        public void trace(Function0<String> function0) {
            Logging.trace$(this, function0);
        }

        public void trace(Function0<String> function0, Function0<Throwable> function02) {
            Logging.trace$(this, function0, function02);
        }

        public boolean isDebugEnabled() {
            return Logging.isDebugEnabled$(this);
        }

        public boolean isTraceEnabled() {
            return Logging.isTraceEnabled$(this);
        }

        public void debug(Function0<String> function0) {
            Logging.debug$(this, function0);
        }

        public void debug(Function0<String> function0, Function0<Throwable> function02) {
            Logging.debug$(this, function0, function02);
        }

        public void info(Function0<String> function0) {
            Logging.info$(this, function0);
        }

        public void info(Function0<String> function0, Function0<Throwable> function02) {
            Logging.info$(this, function0, function02);
        }

        public void warn(Function0<String> function0) {
            Logging.warn$(this, function0);
        }

        public void warn(Function0<String> function0, Function0<Throwable> function02) {
            Logging.warn$(this, function0, function02);
        }

        public void error(Function0<String> function0) {
            Logging.error$(this, function0);
        }

        public void error(Function0<String> function0, Function0<Throwable> function02) {
            Logging.error$(this, function0, function02);
        }

        public void fatal(Function0<String> function0) {
            Logging.fatal$(this, function0);
        }

        public void fatal(Function0<String> function0, Function0<Throwable> function02) {
            Logging.fatal$(this, function0, function02);
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v0 */
        /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
        /* JADX WARN: Type inference failed for: r0v8, types: [kafka.server.epoch.LeaderEpochIntegrationTest$TestFetcherThread] */
        private Logger logger$lzycompute() {
            ?? r0 = this;
            synchronized (r0) {
                if (!this.bitmap$0) {
                    this.logger = Logging.logger$(this);
                    r0 = this;
                    r0.bitmap$0 = true;
                }
                return this.logger;
            }
        }

        public Logger logger() {
            return !this.bitmap$0 ? logger$lzycompute() : this.logger;
        }

        public String logIdent() {
            return this.logIdent;
        }

        public void logIdent_$eq(String str) {
            this.logIdent = str;
        }

        public Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> leaderOffsetsFor(Map<TopicPartition, Object> map) {
            OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection offsetForLeaderTopicCollection = new OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection(map.size());
            Implicits$MapExtensionMethods$ implicits$MapExtensionMethods$ = Implicits$MapExtensionMethods$.MODULE$;
            Map MapExtensionMethods = Implicits$.MODULE$.MapExtensionMethods(map);
            Function2 function2 = (topicPartition, obj) -> {
                return BoxesRunTime.boxToBoolean($anonfun$leaderOffsetsFor$1(offsetForLeaderTopicCollection, topicPartition, BoxesRunTime.unboxToInt(obj)));
            };
            if (implicits$MapExtensionMethods$ == null) {
                throw null;
            }
            MapExtensionMethods$.MODULE$.foreachEntry$extension(package$.MODULE$.toMapExtensionMethods(MapExtensionMethods), (v1, v2) -> {
                return Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(r2, v1, v2);
            });
            return ((TraversableOnce) ((TraversableLike) CollectionConverters$.MODULE$.collectionAsScalaIterableConverter(this.sender.sendRequest(OffsetsForLeaderEpochRequest.Builder.forFollower(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), offsetForLeaderTopicCollection, 1)).responseBody().data().topics()).asScala()).flatMap(offsetForLeaderTopicResult -> {
                return (Buffer) ((TraversableLike) CollectionConverters$.MODULE$.asScalaBufferConverter(offsetForLeaderTopicResult.partitions()).asScala()).map(epochEndOffset -> {
                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new TopicPartition(offsetForLeaderTopicResult.topic(), epochEndOffset.partition())), epochEndOffset);
                }, Buffer$.MODULE$.canBuildFrom());
            }, Iterable$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        }

        public /* synthetic */ LeaderEpochIntegrationTest kafka$server$epoch$LeaderEpochIntegrationTest$TestFetcherThread$$$outer() {
            return this.$outer;
        }

        public static final /* synthetic */ boolean $anonfun$leaderOffsetsFor$1(OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection offsetForLeaderTopicCollection, TopicPartition topicPartition, int i) {
            ImplicitLinkedHashCollection.Element find = offsetForLeaderTopicCollection.find(topicPartition.topic());
            if (find == null) {
                find = new OffsetForLeaderEpochRequestData.OffsetForLeaderTopic().setTopic(topicPartition.topic());
                offsetForLeaderTopicCollection.add(find);
            }
            return find.partitions().add(new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(topicPartition.partition()).setLeaderEpoch(i));
        }

        public TestFetcherThread(LeaderEpochIntegrationTest leaderEpochIntegrationTest, BlockingSend blockingSend) {
            this.sender = blockingSend;
            if (leaderEpochIntegrationTest == null) {
                throw null;
            }
            this.$outer = leaderEpochIntegrationTest;
            Logging.$init$(this);
        }
    }

    public ListBuffer<KafkaServer> brokers() {
        return this.brokers;
    }

    public void brokers_$eq(ListBuffer<KafkaServer> listBuffer) {
        this.brokers = listBuffer;
    }

    public String topic1() {
        return this.topic1;
    }

    public String topic2() {
        return this.topic2;
    }

    public TopicPartition t1p0() {
        return this.t1p0;
    }

    public TopicPartition t1p1() {
        return this.t1p1;
    }

    public TopicPartition t1p2() {
        return this.t1p2;
    }

    public TopicPartition t2p0() {
        return this.t2p0;
    }

    public TopicPartition t2p2() {
        return this.t2p2;
    }

    public TopicPartition tp() {
        return this.tp;
    }

    public KafkaProducer<byte[], byte[]> producer() {
        return this.producer;
    }

    public void producer_$eq(KafkaProducer<byte[], byte[]> kafkaProducer) {
        this.producer = kafkaProducer;
    }

    @Override // kafka.server.QuorumTestHarness
    @AfterEach
    public void tearDown() {
        if (producer() != null) {
            producer().close();
        }
        TestUtils$.MODULE$.shutdownServers(brokers(), TestUtils$.MODULE$.shutdownServers$default$2());
        super.tearDown();
    }

    @Test
    public void shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader() {
        brokers().$plus$plus$eq((TraversableOnce) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 1).map(obj -> {
            return $anonfun$shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader$1(this, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom()));
        new $colon.colon(topic1(), new $colon.colon(topic2(), Nil$.MODULE$)).foreach(str -> {
            return TestUtils$.MODULE$.createTopic(this.zkClient(), str, (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(0)), Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{0, 1})))})), this.brokers());
        });
        sendFourMessagesToEachTopic();
        IntRef create = IntRef.create(0);
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader$3(this, create)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader$4());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
        ((KafkaServer) brokers().apply(0)).shutdown();
        ((KafkaServer) brokers().apply(0)).startup();
        create.elem = 1;
        waitForEpochChangeTo(topic1(), 0, create.elem);
        waitForEpochChangeTo(topic2(), 0, create.elem);
        sendFourMessagesToEachTopic();
        TestUtils$ testUtils$2 = TestUtils$.MODULE$;
        long waitUntilTrue$default$32 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$42 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$2 == null) {
            throw null;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        while (!$anonfun$shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader$5(this, create)) {
            if (System.currentTimeMillis() > currentTimeMillis2 + waitUntilTrue$default$32) {
                Assertions.fail($anonfun$shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader$6());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$32), waitUntilTrue$default$42));
        }
    }

    @Test
    public void shouldSendLeaderEpochRequestAndGetAResponse() {
        brokers().$plus$plus$eq((TraversableOnce) RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(100), 102).map(obj -> {
            return $anonfun$shouldSendLeaderEpochRequestAndGetAResponse$1(this, BoxesRunTime.unboxToInt(obj));
        }, IndexedSeq$.MODULE$.canBuildFrom()));
        TestUtils$.MODULE$.createTopic(zkClient(), topic1(), Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(0)), Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{100}))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(1)), Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{101})))})), brokers());
        TestUtils$.MODULE$.createTopic(zkClient(), topic2(), Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(0)), Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{100})))})), brokers());
        producer_$eq(TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.plaintextBootstrapServers(brokers()), -1, TestUtils$.MODULE$.createProducer$default$3(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5(), TestUtils$.MODULE$.createProducer$default$6(), TestUtils$.MODULE$.createProducer$default$7(), TestUtils$.MODULE$.createProducer$default$8(), TestUtils$.MODULE$.createProducer$default$9(), TestUtils$.MODULE$.createProducer$default$10(), TestUtils$.MODULE$.createProducer$default$11(), TestUtils$.MODULE$.createProducer$default$12(), TestUtils$.MODULE$.createProducer$default$13(), TestUtils$.MODULE$.createProducer$default$14(), TestUtils$.MODULE$.createProducer$default$15(), TestUtils$.MODULE$.createProducer$default$16()));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 10).foreach(obj2 -> {
            return $anonfun$shouldSendLeaderEpochRequestAndGetAResponse$2(this, BoxesRunTime.unboxToInt(obj2));
        });
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 20).foreach(obj3 -> {
            return $anonfun$shouldSendLeaderEpochRequestAndGetAResponse$3(this, BoxesRunTime.unboxToInt(obj3));
        });
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), 30).foreach(obj4 -> {
            return $anonfun$shouldSendLeaderEpochRequestAndGetAResponse$4(this, BoxesRunTime.unboxToInt(obj4));
        });
        producer().flush();
        TestFetcherThread testFetcherThread = new TestFetcherThread(this, sender((KafkaServer) brokers().apply(2), (KafkaServer) brokers().apply(0)));
        Map<TopicPartition, Object> map = (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), BoxesRunTime.boxToInteger(0)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), BoxesRunTime.boxToInteger(0)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p0()), BoxesRunTime.boxToInteger(0)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p2()), BoxesRunTime.boxToInteger(0))}));
        Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> leaderOffsetsFor = testFetcherThread.leaderOffsetsFor(map);
        Assertions.assertEquals(10L, ((OffsetForLeaderEpochResponseData.EpochEndOffset) leaderOffsetsFor.apply(t1p0())).endOffset());
        Assertions.assertEquals(30L, ((OffsetForLeaderEpochResponseData.EpochEndOffset) leaderOffsetsFor.apply(t2p0())).endOffset());
        Assertions.assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code(), ((OffsetForLeaderEpochResponseData.EpochEndOffset) leaderOffsetsFor.apply(t1p1())).errorCode());
        Assertions.assertEquals(-1L, ((OffsetForLeaderEpochResponseData.EpochEndOffset) leaderOffsetsFor.apply(t1p1())).endOffset());
        Assertions.assertEquals(20L, ((OffsetForLeaderEpochResponseData.EpochEndOffset) new TestFetcherThread(this, sender((KafkaServer) brokers().apply(2), (KafkaServer) brokers().apply(1))).leaderOffsetsFor(map).apply(t1p1())).endOffset());
    }

    @Test
    public void shouldIncreaseLeaderEpochBetweenLeaderRestarts() {
        brokers().$plus$eq(TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(100, zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21())), TestUtils$.MODULE$.createServer$default$2()));
        Assertions.assertEquals(100, TestUtils$.MODULE$.waitUntilControllerElected(zkClient(), TestUtils$.MODULE$.waitUntilControllerElected$default$2()));
        brokers().$plus$eq(TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(101, zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21())), TestUtils$.MODULE$.createServer$default$2()));
        TestUtils$.MODULE$.createTopic(zkClient(), tp().topic(), (Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(tp().partition())), Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{101})))})), brokers());
        producer_$eq(TestUtils$.MODULE$.createProducer(TestUtils$.MODULE$.plaintextBootstrapServers(brokers()), -1, TestUtils$.MODULE$.createProducer$default$3(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5(), TestUtils$.MODULE$.createProducer$default$6(), TestUtils$.MODULE$.createProducer$default$7(), TestUtils$.MODULE$.createProducer$default$8(), TestUtils$.MODULE$.createProducer$default$9(), TestUtils$.MODULE$.createProducer$default$10(), TestUtils$.MODULE$.createProducer$default$11(), TestUtils$.MODULE$.createProducer$default$12(), TestUtils$.MODULE$.createProducer$default$13(), TestUtils$.MODULE$.createProducer$default$14(), TestUtils$.MODULE$.createProducer$default$15(), TestUtils$.MODULE$.createProducer$default$16()));
        producer().send(new ProducerRecord(tp().topic(), Predef$.MODULE$.int2Integer(tp().partition()), (Object) null, "IHeartLogs".getBytes())).get();
        OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset = (OffsetForLeaderEpochResponseData.EpochEndOffset) new TestFetcherThread(this, sender((KafkaServer) brokers().apply(0), (KafkaServer) brokers().apply(1))).leaderOffsetsFor((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tp()), BoxesRunTime.boxToInteger(0))}))).apply(tp());
        Assertions.assertEquals(0, epochEndOffset.leaderEpoch());
        Assertions.assertEquals(1L, epochEndOffset.endOffset());
        Assertions.assertEquals(1L, leo$1());
        ((KafkaServer) brokers().apply(1)).shutdown();
        ((KafkaServer) brokers().apply(1)).startup();
        producer().send(new ProducerRecord(tp().topic(), Predef$.MODULE$.int2Integer(tp().partition()), (Object) null, "IHeartLogs".getBytes())).get();
        TestFetcherThread testFetcherThread = new TestFetcherThread(this, sender((KafkaServer) brokers().apply(0), (KafkaServer) brokers().apply(1)));
        OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset2 = (OffsetForLeaderEpochResponseData.EpochEndOffset) testFetcherThread.leaderOffsetsFor((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tp()), BoxesRunTime.boxToInteger(0))}))).apply(tp());
        Assertions.assertEquals(1L, epochEndOffset2.endOffset());
        Assertions.assertEquals(0, epochEndOffset2.leaderEpoch());
        OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset3 = (OffsetForLeaderEpochResponseData.EpochEndOffset) testFetcherThread.leaderOffsetsFor((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tp()), BoxesRunTime.boxToInteger(1))}))).apply(tp());
        Assertions.assertEquals(0, epochEndOffset3.leaderEpoch());
        Assertions.assertEquals(1L, epochEndOffset3.endOffset());
        OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset4 = (OffsetForLeaderEpochResponseData.EpochEndOffset) testFetcherThread.leaderOffsetsFor((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tp()), BoxesRunTime.boxToInteger(2))}))).apply(tp());
        Assertions.assertEquals(2, epochEndOffset4.leaderEpoch());
        Assertions.assertEquals(2L, epochEndOffset4.endOffset());
        Assertions.assertEquals(2L, leo$1());
        ((KafkaServer) brokers().apply(1)).shutdown();
        ((KafkaServer) brokers().apply(1)).startup();
        producer().send(new ProducerRecord(tp().topic(), Predef$.MODULE$.int2Integer(tp().partition()), (Object) null, "IHeartLogs".getBytes())).get();
        TestFetcherThread testFetcherThread2 = new TestFetcherThread(this, sender((KafkaServer) brokers().apply(0), (KafkaServer) brokers().apply(1)));
        Assertions.assertEquals(1L, ((OffsetForLeaderEpochResponseData.EpochEndOffset) testFetcherThread2.leaderOffsetsFor((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tp()), BoxesRunTime.boxToInteger(0))}))).apply(tp())).endOffset());
        Assertions.assertEquals(2L, ((OffsetForLeaderEpochResponseData.EpochEndOffset) testFetcherThread2.leaderOffsetsFor((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tp()), BoxesRunTime.boxToInteger(2))}))).apply(tp())).endOffset());
        Assertions.assertEquals(3L, ((OffsetForLeaderEpochResponseData.EpochEndOffset) testFetcherThread2.leaderOffsetsFor((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tp()), BoxesRunTime.boxToInteger(4))}))).apply(tp())).endOffset());
        Assertions.assertEquals(leo$1(), ((OffsetForLeaderEpochResponseData.EpochEndOffset) testFetcherThread2.leaderOffsetsFor((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tp()), BoxesRunTime.boxToInteger(4))}))).apply(tp())).endOffset());
        shouldSupportRequestsForEpochsNotOnTheLeader(testFetcherThread2);
    }

    public void shouldSupportRequestsForEpochsNotOnTheLeader(TestFetcherThread testFetcherThread) {
        Assertions.assertEquals(1L, ((OffsetForLeaderEpochResponseData.EpochEndOffset) testFetcherThread.leaderOffsetsFor((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), BoxesRunTime.boxToInteger(1))}))).apply(t1p0())).endOffset());
        Assertions.assertEquals(2L, ((OffsetForLeaderEpochResponseData.EpochEndOffset) testFetcherThread.leaderOffsetsFor((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), BoxesRunTime.boxToInteger(3))}))).apply(t1p0())).endOffset());
        Assertions.assertEquals(-1L, ((OffsetForLeaderEpochResponseData.EpochEndOffset) testFetcherThread.leaderOffsetsFor((Map) Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), BoxesRunTime.boxToInteger(5))}))).apply(t1p0())).endOffset());
    }

    private BlockingSend sender(KafkaServer kafkaServer, KafkaServer kafkaServer2) {
        Node node = (Node) kafkaServer.metadataCache().getAliveBrokerNode(kafkaServer2.config().brokerId(), kafkaServer.config().interBrokerListenerName()).get();
        return new BrokerBlockingSender(new BrokerEndPoint(node.id(), node.host(), node.port()), kafkaServer.config(), new Metrics(), new SystemTime(), 42, "TestFetcher", new LogContext());
    }

    private void waitForEpochChangeTo(String str, int i, int i2) {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long waitUntilTrue$default$3 = TestUtils$.MODULE$.waitUntilTrue$default$3();
        long waitUntilTrue$default$4 = TestUtils$.MODULE$.waitUntilTrue$default$4();
        if (testUtils$ == null) {
            throw null;
        }
        long currentTimeMillis = System.currentTimeMillis();
        while (!$anonfun$waitForEpochChangeTo$1(this, str, i, i2)) {
            if (System.currentTimeMillis() > currentTimeMillis + waitUntilTrue$default$3) {
                Assertions.fail($anonfun$waitForEpochChangeTo$3());
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue$default$3), waitUntilTrue$default$4));
        }
    }

    private boolean messagesHaveLeaderEpoch(KafkaServer kafkaServer, int i, int i2) {
        BooleanRef create = BooleanRef.create(true);
        new $colon.colon(topic1(), new $colon.colon(topic2(), Nil$.MODULE$)).foreach(str -> {
            $anonfun$messagesHaveLeaderEpoch$1(this, kafkaServer, create, i2, i, str);
            return BoxedUnit.UNIT;
        });
        return create.elem;
    }

    private void sendFourMessagesToEachTopic() {
        $colon.colon colonVar = new $colon.colon("test1", new $colon.colon("test2", new $colon.colon("test3", new $colon.colon("test4", Nil$.MODULE$))));
        $colon.colon colonVar2 = new $colon.colon("test5", new $colon.colon("test6", new $colon.colon("test7", new $colon.colon("test8", Nil$.MODULE$))));
        String plaintextBootstrapServers = TestUtils$.MODULE$.plaintextBootstrapServers(brokers());
        StringSerializer stringSerializer = new StringSerializer();
        StringSerializer stringSerializer2 = new StringSerializer();
        KafkaProducer createProducer = TestUtils$.MODULE$.createProducer(plaintextBootstrapServers, TestUtils$.MODULE$.createProducer$default$2(), TestUtils$.MODULE$.createProducer$default$3(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5(), TestUtils$.MODULE$.createProducer$default$6(), TestUtils$.MODULE$.createProducer$default$7(), TestUtils$.MODULE$.createProducer$default$8(), TestUtils$.MODULE$.createProducer$default$9(), TestUtils$.MODULE$.createProducer$default$10(), TestUtils$.MODULE$.createProducer$default$11(), TestUtils$.MODULE$.createProducer$default$12(), TestUtils$.MODULE$.createProducer$default$13(), stringSerializer, stringSerializer2, TestUtils$.MODULE$.createProducer$default$16());
        ((List) ((List) ((List) colonVar.map(str -> {
            return new ProducerRecord(this.topic1(), str, str);
        }, List$.MODULE$.canBuildFrom())).$plus$plus((GenTraversableOnce) colonVar2.map(str2 -> {
            return new ProducerRecord(this.topic2(), str2, str2);
        }, List$.MODULE$.canBuildFrom()), List$.MODULE$.canBuildFrom())).map(producerRecord -> {
            return createProducer.send(producerRecord);
        }, List$.MODULE$.canBuildFrom())).foreach(future -> {
            return (RecordMetadata) future.get();
        });
        createProducer.close();
    }

    public static final /* synthetic */ KafkaServer $anonfun$shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader$1(LeaderEpochIntegrationTest leaderEpochIntegrationTest, int i) {
        return TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(i, leaderEpochIntegrationTest.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21())), TestUtils$.MODULE$.createServer$default$2());
    }

    public static final /* synthetic */ boolean $anonfun$shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader$3(LeaderEpochIntegrationTest leaderEpochIntegrationTest, IntRef intRef) {
        return leaderEpochIntegrationTest.messagesHaveLeaderEpoch((KafkaServer) leaderEpochIntegrationTest.brokers().apply(0), intRef.elem, 0);
    }

    public static final /* synthetic */ String $anonfun$shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader$4() {
        return "Leader epoch should be 0";
    }

    public static final /* synthetic */ boolean $anonfun$shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader$5(LeaderEpochIntegrationTest leaderEpochIntegrationTest, IntRef intRef) {
        return leaderEpochIntegrationTest.messagesHaveLeaderEpoch((KafkaServer) leaderEpochIntegrationTest.brokers().apply(0), intRef.elem, 4);
    }

    public static final /* synthetic */ String $anonfun$shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader$6() {
        return "Leader epoch should be 1";
    }

    public static final /* synthetic */ KafkaServer $anonfun$shouldSendLeaderEpochRequestAndGetAResponse$1(LeaderEpochIntegrationTest leaderEpochIntegrationTest, int i) {
        return TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(i, leaderEpochIntegrationTest.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20(), TestUtils$.MODULE$.createBrokerConfig$default$21())), TestUtils$.MODULE$.createServer$default$2());
    }

    public static final /* synthetic */ Future $anonfun$shouldSendLeaderEpochRequestAndGetAResponse$2(LeaderEpochIntegrationTest leaderEpochIntegrationTest, int i) {
        return leaderEpochIntegrationTest.producer().send(new ProducerRecord(leaderEpochIntegrationTest.topic1(), Predef$.MODULE$.int2Integer(0), (Object) null, "IHeartLogs".getBytes()));
    }

    public static final /* synthetic */ Future $anonfun$shouldSendLeaderEpochRequestAndGetAResponse$3(LeaderEpochIntegrationTest leaderEpochIntegrationTest, int i) {
        return leaderEpochIntegrationTest.producer().send(new ProducerRecord(leaderEpochIntegrationTest.topic1(), Predef$.MODULE$.int2Integer(1), (Object) null, "OhAreThey".getBytes()));
    }

    public static final /* synthetic */ Future $anonfun$shouldSendLeaderEpochRequestAndGetAResponse$4(LeaderEpochIntegrationTest leaderEpochIntegrationTest, int i) {
        return leaderEpochIntegrationTest.producer().send(new ProducerRecord(leaderEpochIntegrationTest.topic2(), Predef$.MODULE$.int2Integer(0), (Object) null, "IReallyDo".getBytes()));
    }

    private final long leo$1() {
        return ((UnifiedLog) ((KafkaServer) brokers().apply(1)).replicaManager().localLog(tp()).get()).logEndOffset();
    }

    public static final /* synthetic */ boolean $anonfun$waitForEpochChangeTo$2(int i, UpdateMetadataRequestData.UpdateMetadataPartitionState updateMetadataPartitionState) {
        return updateMetadataPartitionState.leaderEpoch() == i;
    }

    public static final /* synthetic */ boolean $anonfun$waitForEpochChangeTo$1(LeaderEpochIntegrationTest leaderEpochIntegrationTest, String str, int i, int i2) {
        return ((KafkaServer) leaderEpochIntegrationTest.brokers().apply(0)).metadataCache().getPartitionInfo(str, i).exists(updateMetadataPartitionState -> {
            return BoxesRunTime.boxToBoolean($anonfun$waitForEpochChangeTo$2(i2, updateMetadataPartitionState));
        });
    }

    public static final /* synthetic */ String $anonfun$waitForEpochChangeTo$3() {
        return "Epoch didn't change";
    }

    public static final /* synthetic */ boolean $anonfun$messagesHaveLeaderEpoch$4(int i, RecordBatch recordBatch) {
        return i == recordBatch.partitionLeaderEpoch();
    }

    public static final /* synthetic */ boolean $anonfun$messagesHaveLeaderEpoch$3(int i, int i2, LogSegment logSegment) {
        if (logSegment.read(i, Integer.MAX_VALUE, logSegment.read$default$3(), logSegment.read$default$4()) == null) {
            return false;
        }
        return ((Iterator) CollectionConverters$.MODULE$.asScalaIteratorConverter(logSegment.read(i, Integer.MAX_VALUE, logSegment.read$default$3(), logSegment.read$default$4()).records().batches().iterator()).asScala()).forall(recordBatch -> {
            return BoxesRunTime.boxToBoolean($anonfun$messagesHaveLeaderEpoch$4(i2, recordBatch));
        });
    }

    public static final /* synthetic */ boolean $anonfun$messagesHaveLeaderEpoch$2(TopicPartition topicPartition, int i, int i2, KafkaServer kafkaServer) {
        LogManager logManager = kafkaServer.getLogManager();
        return ((UnifiedLog) logManager.getLog(topicPartition, logManager.getLog$default$2()).get()).logSegments().iterator().forall(logSegment -> {
            return BoxesRunTime.boxToBoolean($anonfun$messagesHaveLeaderEpoch$3(i, i2, logSegment));
        });
    }

    public static final /* synthetic */ void $anonfun$messagesHaveLeaderEpoch$1(LeaderEpochIntegrationTest leaderEpochIntegrationTest, KafkaServer kafkaServer, BooleanRef booleanRef, int i, int i2, String str) {
        TopicPartition topicPartition = new TopicPartition(str, 0);
        LogManager logManager = kafkaServer.getLogManager();
        booleanRef.elem = booleanRef.elem && ((UnifiedLog) logManager.getLog(topicPartition, logManager.getLog$default$2()).get()).logEndOffset() > 0 && leaderEpochIntegrationTest.brokers().forall(kafkaServer2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$messagesHaveLeaderEpoch$2(topicPartition, i, i2, kafkaServer2));
        });
    }
}
