package kafka.integration;

import java.nio.ByteBuffer;
import java.util.Properties;
import junit.framework.Assert;
import kafka.admin.AdminUtils$;
import kafka.api.FetchRequest;
import kafka.api.FetchRequest$;
import kafka.api.FetchRequestBuilder;
import kafka.api.FetchResponse;
import kafka.cluster.Replica;
import kafka.common.OffsetOutOfRangeException;
import kafka.common.UnknownTopicOrPartitionException;
import kafka.consumer.SimpleConsumer;
import kafka.integration.KafkaServerTestHarness;
import kafka.integration.ProducerConsumerTestHarness;
import kafka.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import kafka.producer.KeyedMessage;
import kafka.producer.Producer;
import kafka.producer.ProducerConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRequestHandler;
import kafka.server.KafkaServer;
import kafka.server.ReplicaManager;
import kafka.utils.TestUtils$;
import kafka.utils.Utils$;
import kafka.zk.EmbeddedZookeeper;
import kafka.zk.ZooKeeperTestHarness;
import org.I0Itec.zkclient.ZkClient;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.scalatest.junit.JUnit3Suite;
import scala.Predef$;
import scala.ScalaObject;
import scala.Tuple2;
import scala.collection.LinearSeqOptimized;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.HashMap;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.TraitSetter;

/* compiled from: PrimitiveApiTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005%b\u0001B\u0001\u0003\u0001\u001d\u0011\u0001\u0003\u0015:j[&$\u0018N^3Ba&$Vm\u001d;\u000b\u0005\r!\u0011aC5oi\u0016<'/\u0019;j_:T\u0011!B\u0001\u0006W\u000647.Y\u0002\u0001'\u0015\u0001\u0001B\u0005\f\u001d!\tI\u0001#D\u0001\u000b\u0015\tYA\"A\u0003kk:LGO\u0003\u0002\u000e\u001d\u0005I1oY1mCR,7\u000f\u001e\u0006\u0002\u001f\u0005\u0019qN]4\n\u0005EQ!a\u0003&V]&$8gU;ji\u0016\u0004\"a\u0005\u000b\u000e\u0003\tI!!\u0006\u0002\u00037A\u0013x\u000eZ;dKJ\u001cuN\\:v[\u0016\u0014H+Z:u\u0011\u0006\u0014h.Z:t!\t9\"$D\u0001\u0019\u0015\tIB!\u0001\u0002{W&\u00111\u0004\u0007\u0002\u00155>|7*Z3qKJ$Vm\u001d;ICJtWm]:\u0011\u0005u\u0001S\"\u0001\u0010\u000b\u0003}\tQa]2bY\u0006L!!\t\u0010\u0003\u0017M\u001b\u0017\r\\1PE*,7\r\u001e\u0005\u0006G\u0001!\t\u0001J\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003\u0015\u0002\"a\u0005\u0001\t\u000f\u001d\u0002!\u0019!C\u0001Q\u0005!\u0001o\u001c:u+\u0005I\u0003CA\u000f+\u0013\tYcDA\u0002J]RDa!\f\u0001!\u0002\u0013I\u0013!\u00029peR\u0004\u0003bB\u0018\u0001\u0005\u0004%\t\u0001M\u0001\u0006aJ|\u0007o]\u000b\u0002cA\u0011!gN\u0007\u0002g)\u0011A'N\u0001\u0005kRLGNC\u00017\u0003\u0011Q\u0017M^1\n\u0005a\u001a$A\u0003)s_B,'\u000f^5fg\"1!\b\u0001Q\u0001\nE\na\u0001\u001d:paN\u0004\u0003b\u0002\u001f\u0001\u0005\u0004%\t!P\u0001\u0007G>tg-[4\u0016\u0003y\u0002\"a\u0010\"\u000e\u0003\u0001S!!\u0011\u0003\u0002\rM,'O^3s\u0013\t\u0019\u0005IA\u0006LC\u001a\\\u0017mQ8oM&<\u0007BB#\u0001A\u0003%a(A\u0004d_:4\u0017n\u001a\u0011\t\u000f\u001d\u0003!\u0019!C\u0001\u0011\u000691m\u001c8gS\u001e\u001cX#A%\u0011\u0007){e(D\u0001L\u0015\taU*A\u0005j[6,H/\u00192mK*\u0011aJH\u0001\u000bG>dG.Z2uS>t\u0017B\u0001)L\u0005\u0011a\u0015n\u001d;\t\rI\u0003\u0001\u0015!\u0003J\u0003!\u0019wN\u001c4jON\u0004\u0003b\u0002+\u0001\u0005\u0004%\t!V\u0001\u0015e\u0016\fX/Z:u\u0011\u0006tG\r\\3s\u0019><w-\u001a:\u0016\u0003Y\u0003\"a\u0016/\u000e\u0003aS!!\u0017.\u0002\u000b1|w\r\u000e6\u000b\u0005ms\u0011AB1qC\u000eDW-\u0003\u0002^1\n1Aj\\4hKJDaa\u0018\u0001!\u0002\u00131\u0016!\u0006:fcV,7\u000f\u001e%b]\u0012dWM\u001d'pO\u001e,'\u000f\t\u0005\u0006C\u0002!\tAY\u0001%i\u0016\u001cHOR3uG\"\u0014V-];fgR\u001c\u0015M\u001c)s_B,'\u000f\\=TKJL\u0017\r\\5{KR\t1\r\u0005\u0002\u001eI&\u0011QM\b\u0002\u0005+:LG\u000fC\u0003h\u0001\u0011\u0005!-A\u000buKN$X)\u001c9us\u001a+Go\u00195SKF,Xm\u001d;\t\u000b%\u0004A\u0011\u00012\u0002EQ,7\u000f\u001e#fM\u0006,H\u000e^#oG>$WM\u001d)s_\u0012,8-\u001a:B]\u00124U\r^2i\u0011\u0015Y\u0007\u0001\"\u0001c\u0003E\"Xm\u001d;EK\u001a\fW\u000f\u001c;F]\u000e|G-\u001a:Qe>$WoY3s\u0003:$g)\u001a;dQ^KG\u000f[\"p[B\u0014Xm]:j_:DQ!\u001c\u0001\u0005\u0002\t\f\u0001\u0004^3tiB\u0013x\u000eZ;dK\u0006sG-T;mi&4U\r^2i\u0011\u0015y\u0007\u0001\"\u0001c\u0003\u001d\"Xm\u001d;Qe>$WoY3B]\u0012lU\u000f\u001c;j\r\u0016$8\r[,ji\"\u001cu.\u001c9sKN\u001c\u0018n\u001c8\t\u000bE\u0004A\u0011\u00012\u0002!Q,7\u000f^'vYRL\u0007K]8ek\u000e,\u0007\"B:\u0001\t\u0003\u0011\u0017a\b;fgRlU\u000f\u001c;j!J|G-^2f/&$\bnQ8naJ,7o]5p]\")Q\u000f\u0001C\u0001E\u00061B/Z:u\u0007>t7/^7fe\u0016k\u0007\u000f^=U_BL7\rC\u0003x\u0001\u0011\u0005!-\u0001\u000fuKN$\b+\u001b9fY&tW\r\u001a)s_\u0012,8-\u001a*fcV,7\u000f^:\t\u000be\u0004A\u0011\u0001>\u0002A\r\u0014X-\u0019;f'&l\u0007\u000f\\3U_BL7m]!oI\u0006;\u0018-\u001b;MK\u0006$WM\u001d\u000b\u0007Gn\fY!!\n\t\u000bqD\b\u0019A?\u0002\u0011i\\7\t\\5f]R\u00042A`A\u0004\u001b\u0005y(\u0002BA\u0001\u0003\u0007\t\u0001B_6dY&,g\u000e\u001e\u0006\u0004\u0003\u000bq\u0011AB%1\u0013R,7-C\u0002\u0002\n}\u0014\u0001BW6DY&,g\u000e\u001e\u0005\b\u0003\u001bA\b\u0019AA\b\u0003\u0019!x\u000e]5dgB1\u0011\u0011CA\n\u0003/i\u0011!T\u0005\u0004\u0003+i%aA*fcB!\u0011\u0011DA\u0010\u001d\ri\u00121D\u0005\u0004\u0003;q\u0012A\u0002)sK\u0012,g-\u0003\u0003\u0002\"\u0005\r\"AB*ue&twMC\u0002\u0002\u001eyAa!a\ny\u0001\u0004I\u0013\u0001\u00032s_.,'/\u00133")
/* loaded from: input_file:kafka/integration/PrimitiveApiTest.class */
public class PrimitiveApiTest extends JUnit3Suite implements ProducerConsumerTestHarness, ZooKeeperTestHarness, ScalaObject {
    private final int port;
    private final Properties props;
    private final KafkaConfig config;
    private final List<KafkaConfig> configs;
    private final Logger requestHandlerLogger;
    private final String host;
    private Producer<String, String> producer;
    private SimpleConsumer consumer;
    private List<KafkaServer> servers;
    private final String zkConnect;
    private EmbeddedZookeeper zookeeper;
    private ZkClient zkClient;
    private final int zkConnectionTimeout;
    private final int zkSessionTimeout;

    @Override // kafka.integration.ProducerConsumerTestHarness
    public /* bridge */ String host() {
        return this.host;
    }

    @Override // kafka.integration.ProducerConsumerTestHarness
    public /* bridge */ Producer<String, String> producer() {
        return this.producer;
    }

    @Override // kafka.integration.ProducerConsumerTestHarness
    public /* bridge */ void producer_$eq(Producer<String, String> producer) {
        this.producer = producer;
    }

    @Override // kafka.integration.ProducerConsumerTestHarness
    public /* bridge */ SimpleConsumer consumer() {
        return this.consumer;
    }

    @Override // kafka.integration.ProducerConsumerTestHarness
    public /* bridge */ void consumer_$eq(SimpleConsumer simpleConsumer) {
        this.consumer = simpleConsumer;
    }

    @Override // kafka.integration.ProducerConsumerTestHarness
    public final /* bridge */ void kafka$integration$ProducerConsumerTestHarness$$super$setUp() {
        KafkaServerTestHarness.Cclass.setUp(this);
    }

    @Override // kafka.integration.ProducerConsumerTestHarness
    public final /* bridge */ void kafka$integration$ProducerConsumerTestHarness$$super$tearDown() {
        KafkaServerTestHarness.Cclass.tearDown(this);
    }

    @Override // kafka.integration.ProducerConsumerTestHarness
    public /* bridge */ void kafka$integration$ProducerConsumerTestHarness$_setter_$host_$eq(String str) {
        this.host = str;
    }

    @Override // kafka.integration.ProducerConsumerTestHarness, kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    public /* bridge */ void setUp() {
        ProducerConsumerTestHarness.Cclass.setUp(this);
    }

    @Override // kafka.integration.ProducerConsumerTestHarness, kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    public /* bridge */ void tearDown() {
        ProducerConsumerTestHarness.Cclass.tearDown(this);
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public /* bridge */ List<KafkaServer> servers() {
        return this.servers;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    @TraitSetter
    public /* bridge */ void servers_$eq(List<KafkaServer> list) {
        this.servers = list;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public final /* bridge */ void kafka$integration$KafkaServerTestHarness$$super$setUp() {
        ZooKeeperTestHarness.Cclass.setUp(this);
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public final /* bridge */ void kafka$integration$KafkaServerTestHarness$$super$tearDown() {
        ZooKeeperTestHarness.Cclass.tearDown(this);
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public /* bridge */ String zkConnect() {
        return this.zkConnect;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public /* bridge */ EmbeddedZookeeper zookeeper() {
        return this.zookeeper;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public /* bridge */ void zookeeper_$eq(EmbeddedZookeeper embeddedZookeeper) {
        this.zookeeper = embeddedZookeeper;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public /* bridge */ ZkClient zkClient() {
        return this.zkClient;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public /* bridge */ void zkClient_$eq(ZkClient zkClient) {
        this.zkClient = zkClient;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public /* bridge */ int zkConnectionTimeout() {
        return this.zkConnectionTimeout;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public /* bridge */ int zkSessionTimeout() {
        return this.zkSessionTimeout;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public final /* bridge */ void kafka$zk$ZooKeeperTestHarness$$super$setUp() {
        super/*junit.framework.TestCase*/.setUp();
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public final /* bridge */ void kafka$zk$ZooKeeperTestHarness$$super$tearDown() {
        super/*junit.framework.TestCase*/.tearDown();
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public /* bridge */ void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnect_$eq(String str) {
        this.zkConnect = str;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public /* bridge */ void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnectionTimeout_$eq(int i) {
        this.zkConnectionTimeout = i;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public /* bridge */ void kafka$zk$ZooKeeperTestHarness$_setter_$zkSessionTimeout_$eq(int i) {
        this.zkSessionTimeout = i;
    }

    @Override // kafka.integration.ProducerConsumerTestHarness
    public int port() {
        return this.port;
    }

    public Properties props() {
        return this.props;
    }

    public KafkaConfig config() {
        return this.config;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public List<KafkaConfig> configs() {
        return this.configs;
    }

    public Logger requestHandlerLogger() {
        return this.requestHandlerLogger;
    }

    public void testFetchRequestCanProperlySerialize() {
        FetchRequest build = new FetchRequestBuilder().clientId("test-client").maxWait(10001).minBytes(4444).addFetch("topic1", 0, 0L, 10000).addFetch("topic2", 1, 1024L, 9999).addFetch("topic1", 1, 256L, 444).build();
        ByteBuffer allocate = ByteBuffer.allocate(build.sizeInBytes());
        build.writeTo(allocate);
        allocate.rewind();
        Assert.assertEquals(build, FetchRequest$.MODULE$.readFrom(allocate));
    }

    public void testEmptyFetchRequest() {
        FetchResponse fetch = consumer().fetch(new FetchRequest(FetchRequest$.MODULE$.init$default$1(), FetchRequest$.MODULE$.init$default$2(), FetchRequest$.MODULE$.init$default$3(), FetchRequest$.MODULE$.init$default$4(), FetchRequest$.MODULE$.init$default$5(), FetchRequest$.MODULE$.init$default$6(), Map$.MODULE$.apply(Nil$.MODULE$)));
        Assert.assertTrue(!fetch.hasError() && fetch.data().size() == 0);
    }

    public void testDefaultEncoderProducerAndFetch() {
        new Producer(new ProducerConfig(producer().config().props().props())).send(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("test-topic", "test-message")}));
        ReplicaManager replicaManager = ((KafkaServer) servers().head()).replicaManager();
        Replica replica = (Replica) replicaManager.getReplica("test-topic", 0, replicaManager.getReplica$default$3()).get();
        Assert.assertTrue("HighWatermark should equal logEndOffset with just 1 replica", replica.logEndOffset() > 0 && replica.logEndOffset() == replica.highWatermark());
        FetchResponse fetch = consumer().fetch(new FetchRequestBuilder().clientId("test-client").addFetch("test-topic", 0, 0L, 10000).build());
        Assert.assertEquals("Returned correlationId doesn't match that in request.", 0, fetch.correlationId());
        ByteBufferMessageSet messageSet = fetch.messageSet("test-topic", 0);
        Assert.assertTrue(messageSet.iterator().hasNext());
        Assert.assertEquals("test-message", Utils$.MODULE$.readString(((MessageAndOffset) messageSet.head()).message().payload(), "UTF-8"));
    }

    public void testDefaultEncoderProducerAndFetchWithCompression() {
        Properties props = producer().config().props().props();
        props.put("compression", "true");
        new Producer(new ProducerConfig(props)).send(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("test-topic", "test-message")}));
        ByteBufferMessageSet messageSet = consumer().fetch(new FetchRequestBuilder().addFetch("test-topic", 0, 0L, 10000).build()).messageSet("test-topic", 0);
        Assert.assertTrue(messageSet.iterator().hasNext());
        Assert.assertEquals("test-message", Utils$.MODULE$.readString(((MessageAndOffset) messageSet.head()).message().payload(), "UTF-8"));
    }

    public void testProduceAndMultiFetch() {
        createSimpleTopicsAndAwaitLeader(zkClient(), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"test1", "test2", "test3", "test4"})), config().brokerId());
        List apply = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("test4", BoxesRunTime.boxToInteger(0)), new Tuple2("test1", BoxesRunTime.boxToInteger(0)), new Tuple2("test2", BoxesRunTime.boxToInteger(0)), new Tuple2("test3", BoxesRunTime.boxToInteger(0))}));
        HashMap hashMap = new HashMap();
        FetchRequestBuilder fetchRequestBuilder = new FetchRequestBuilder();
        ((LinearSeqOptimized) apply.filter(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetch$1(this))).foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetch$2(this, hashMap, fetchRequestBuilder));
        ((LinearSeqOptimized) apply.filter(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetch$3(this))).foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetch$4(this, hashMap, consumer().fetch(fetchRequestBuilder.build())));
        requestHandlerLogger().setLevel(Level.FATAL);
        FetchRequestBuilder fetchRequestBuilder2 = new FetchRequestBuilder();
        ((LinearSeqOptimized) apply.filter(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetch$5(this))).foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetch$6(this, fetchRequestBuilder2));
        try {
            consumer().fetch(fetchRequestBuilder2.build()).data().values().foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetch$7(this));
            throw fail("Expected exception when fetching message with invalid offset");
        } catch (OffsetOutOfRangeException e) {
            FetchRequestBuilder fetchRequestBuilder3 = new FetchRequestBuilder();
            ((LinearSeqOptimized) apply.filter(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetch$8(this))).foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetch$9(this, fetchRequestBuilder3));
            try {
                consumer().fetch(fetchRequestBuilder3.build()).data().values().foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetch$10(this));
                throw fail("Expected exception when fetching message with invalid partition");
            } catch (UnknownTopicOrPartitionException e2) {
                requestHandlerLogger().setLevel(Level.ERROR);
            }
        }
    }

    public void testProduceAndMultiFetchWithCompression() {
        createSimpleTopicsAndAwaitLeader(zkClient(), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"test1", "test2", "test3", "test4"})), config().brokerId());
        List apply = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("test4", BoxesRunTime.boxToInteger(0)), new Tuple2("test1", BoxesRunTime.boxToInteger(0)), new Tuple2("test2", BoxesRunTime.boxToInteger(0)), new Tuple2("test3", BoxesRunTime.boxToInteger(0))}));
        HashMap hashMap = new HashMap();
        FetchRequestBuilder fetchRequestBuilder = new FetchRequestBuilder();
        ((LinearSeqOptimized) apply.filter(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetchWithCompression$1(this))).foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetchWithCompression$2(this, hashMap, fetchRequestBuilder));
        ((LinearSeqOptimized) apply.filter(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetchWithCompression$3(this))).foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetchWithCompression$4(this, hashMap, consumer().fetch(fetchRequestBuilder.build())));
        requestHandlerLogger().setLevel(Level.FATAL);
        FetchRequestBuilder fetchRequestBuilder2 = new FetchRequestBuilder();
        ((LinearSeqOptimized) apply.filter(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetchWithCompression$5(this))).foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetchWithCompression$6(this, fetchRequestBuilder2));
        try {
            consumer().fetch(fetchRequestBuilder2.build()).data().values().foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetchWithCompression$7(this));
            throw fail("Expected exception when fetching message with invalid offset");
        } catch (OffsetOutOfRangeException e) {
            FetchRequestBuilder fetchRequestBuilder3 = new FetchRequestBuilder();
            ((LinearSeqOptimized) apply.filter(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetchWithCompression$8(this))).foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetchWithCompression$9(this, fetchRequestBuilder3));
            try {
                consumer().fetch(fetchRequestBuilder3.build()).data().values().foreach(new PrimitiveApiTest$$anonfun$testProduceAndMultiFetchWithCompression$10(this));
                throw fail("Expected exception when fetching message with invalid partition");
            } catch (UnknownTopicOrPartitionException e2) {
                requestHandlerLogger().setLevel(Level.ERROR);
            }
        }
    }

    public void testMultiProduce() {
        createSimpleTopicsAndAwaitLeader(zkClient(), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"test1", "test2", "test3", "test4"})), config().brokerId());
        List apply = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("test4", BoxesRunTime.boxToInteger(0)), new Tuple2("test1", BoxesRunTime.boxToInteger(0)), new Tuple2("test2", BoxesRunTime.boxToInteger(0)), new Tuple2("test3", BoxesRunTime.boxToInteger(0))}));
        HashMap hashMap = new HashMap();
        FetchRequestBuilder fetchRequestBuilder = new FetchRequestBuilder();
        Nil$ nil$ = Nil$.MODULE$;
        ((LinearSeqOptimized) apply.filter(new PrimitiveApiTest$$anonfun$testMultiProduce$1(this))).foreach(new PrimitiveApiTest$$anonfun$testMultiProduce$2(this, hashMap, fetchRequestBuilder));
        producer().send(nil$);
        ((LinearSeqOptimized) apply.filter(new PrimitiveApiTest$$anonfun$testMultiProduce$3(this))).foreach(new PrimitiveApiTest$$anonfun$testMultiProduce$4(this, hashMap, consumer().fetch(fetchRequestBuilder.build())));
    }

    public void testMultiProduceWithCompression() {
        List apply = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("test4", BoxesRunTime.boxToInteger(0)), new Tuple2("test1", BoxesRunTime.boxToInteger(0)), new Tuple2("test2", BoxesRunTime.boxToInteger(0)), new Tuple2("test3", BoxesRunTime.boxToInteger(0))}));
        HashMap hashMap = new HashMap();
        FetchRequestBuilder fetchRequestBuilder = new FetchRequestBuilder();
        Nil$ nil$ = Nil$.MODULE$;
        ((LinearSeqOptimized) apply.filter(new PrimitiveApiTest$$anonfun$testMultiProduceWithCompression$1(this))).foreach(new PrimitiveApiTest$$anonfun$testMultiProduceWithCompression$2(this, hashMap, fetchRequestBuilder));
        producer().send(nil$);
        ((LinearSeqOptimized) apply.filter(new PrimitiveApiTest$$anonfun$testMultiProduceWithCompression$3(this))).foreach(new PrimitiveApiTest$$anonfun$testMultiProduceWithCompression$4(this, hashMap, consumer().fetch(fetchRequestBuilder.build())));
    }

    public void testConsumerEmptyTopic() {
        AdminUtils$.MODULE$.createTopic(zkClient(), "new-topic", 1, 1, AdminUtils$.MODULE$.createTopic$default$5());
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated(servers(), "new-topic", 0, 1000L);
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "new-topic", 0, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
        Assert.assertFalse(consumer().fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0L, 10000).build()).messageSet("new-topic", 0).iterator().hasNext());
    }

    public void testPipelinedProduceRequests() {
        createSimpleTopicsAndAwaitLeader(zkClient(), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"test1", "test2", "test3", "test4"})), config().brokerId());
        Properties props = producer().config().props().props();
        props.put("request.required.acks", "0");
        Producer producer = new Producer(new ProducerConfig(props));
        List apply = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("test4", BoxesRunTime.boxToInteger(0)), new Tuple2("test1", BoxesRunTime.boxToInteger(0)), new Tuple2("test2", BoxesRunTime.boxToInteger(0)), new Tuple2("test3", BoxesRunTime.boxToInteger(0))}));
        HashMap hashMap = new HashMap();
        FetchRequestBuilder fetchRequestBuilder = new FetchRequestBuilder();
        Nil$ nil$ = Nil$.MODULE$;
        ((LinearSeqOptimized) apply.filter(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$9(this))).foreach(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$10(this, producer, hashMap, fetchRequestBuilder));
        TestUtils$.MODULE$.waitUntilTrue(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$1(this), 1000L);
        TestUtils$.MODULE$.waitUntilTrue(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$2(this), 1000L);
        TestUtils$.MODULE$.waitUntilTrue(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$3(this), 1000L);
        TestUtils$.MODULE$.waitUntilTrue(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$4(this), 1000L);
        int brokerId = ((KafkaServer) servers().head()).config().brokerId();
        long replicaHighWatermarkCheckpointIntervalMs = config().replicaHighWatermarkCheckpointIntervalMs();
        TestUtils$.MODULE$.waitUntilTrue(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$5(this, brokerId), replicaHighWatermarkCheckpointIntervalMs);
        TestUtils$.MODULE$.waitUntilTrue(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$6(this, brokerId), replicaHighWatermarkCheckpointIntervalMs);
        TestUtils$.MODULE$.waitUntilTrue(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$7(this, brokerId), replicaHighWatermarkCheckpointIntervalMs);
        TestUtils$.MODULE$.waitUntilTrue(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$8(this, brokerId), replicaHighWatermarkCheckpointIntervalMs);
        ((LinearSeqOptimized) apply.filter(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$11(this))).foreach(new PrimitiveApiTest$$anonfun$testPipelinedProduceRequests$12(this, hashMap, consumer().fetch(fetchRequestBuilder.build())));
    }

    public void createSimpleTopicsAndAwaitLeader(ZkClient zkClient, Seq<String> seq, int i) {
        seq.foreach(new PrimitiveApiTest$$anonfun$createSimpleTopicsAndAwaitLeader$1(this, zkClient));
    }

    public PrimitiveApiTest() {
        ZooKeeperTestHarness.Cclass.$init$(this);
        servers_$eq(null);
        ProducerConsumerTestHarness.Cclass.$init$(this);
        this.port = TestUtils$.MODULE$.choosePort();
        this.props = TestUtils$.MODULE$.createBrokerConfig(0, port());
        this.config = new KafkaConfig(props());
        this.configs = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KafkaConfig[]{config()}));
        this.requestHandlerLogger = Logger.getLogger(KafkaRequestHandler.class);
    }
}
