package kafka.api.test;

import java.util.concurrent.Future;
import kafka.api.FetchRequestBuilder;
import kafka.consumer.SimpleConsumer;
import kafka.integration.KafkaServerTestHarness;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils$;
import kafka.zk.EmbeddedZookeeper;
import kafka.zk.ZooKeeperTestHarness;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.Assert;
import org.junit.Test;
import org.scalatest.junit.JUnit3Suite;
import scala.Option;
import scala.Predef$;
import scala.ScalaObject;
import scala.collection.LinearSeqOptimized;
import scala.collection.TraversableLike;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;

/* compiled from: ProducerSendTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005mc\u0001B\u0001\u0003\u0001%\u0011\u0001\u0003\u0015:pIV\u001cWM]*f]\u0012$Vm\u001d;\u000b\u0005\r!\u0011\u0001\u0002;fgRT!!\u0002\u0004\u0002\u0007\u0005\u0004\u0018NC\u0001\b\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019B\u0001\u0001\u0006\u00155A\u00111BE\u0007\u0002\u0019)\u0011QBD\u0001\u0006UVt\u0017\u000e\u001e\u0006\u0003\u001fA\t\u0011b]2bY\u0006$Xm\u001d;\u000b\u0003E\t1a\u001c:h\u0013\t\u0019BBA\u0006K+:LGoM*vSR,\u0007CA\u000b\u0019\u001b\u00051\"BA\f\u0007\u0003-Ig\u000e^3he\u0006$\u0018n\u001c8\n\u0005e1\"AF&bM.\f7+\u001a:wKJ$Vm\u001d;ICJtWm]:\u0011\u0005mqR\"\u0001\u000f\u000b\u0003u\tQa]2bY\u0006L!a\b\u000f\u0003\u0017M\u001b\u0017\r\\1PE*,7\r\u001e\u0005\u0006C\u0001!\tAI\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003\r\u0002\"\u0001\n\u0001\u000e\u0003\tAqA\n\u0001C\u0002\u0013\u0005q%\u0001\u0006ok6\u001cVM\u001d<feN,\u0012\u0001\u000b\t\u00037%J!A\u000b\u000f\u0003\u0007%sG\u000f\u0003\u0004-\u0001\u0001\u0006I\u0001K\u0001\f]Vl7+\u001a:wKJ\u001c\b\u0005C\u0004/\u0001\t\u0007I\u0011A\u0018\u0002\u000f\r|gNZ5hgV\t\u0001\u0007E\u00022maj\u0011A\r\u0006\u0003gQ\n\u0011\"[7nkR\f'\r\\3\u000b\u0005Ub\u0012AC2pY2,7\r^5p]&\u0011qG\r\u0002\u0005\u0019&\u001cH\u000f\u0005\u0002:y5\t!H\u0003\u0002<\r\u000511/\u001a:wKJL!!\u0010\u001e\u0003\u0017-\u000bgm[1D_:4\u0017n\u001a\u0005\u0007\u007f\u0001\u0001\u000b\u0011\u0002\u0019\u0002\u0011\r|gNZ5hg\u0002Bq!\u0011\u0001A\u0002\u0013%!)A\u0005d_:\u001cX/\\3scU\t1\t\u0005\u0002E\u000f6\tQI\u0003\u0002G\r\u0005A1m\u001c8tk6,'/\u0003\u0002I\u000b\nq1+[7qY\u0016\u001cuN\\:v[\u0016\u0014\bb\u0002&\u0001\u0001\u0004%IaS\u0001\u000eG>t7/^7feFzF%Z9\u0015\u00051{\u0005CA\u000eN\u0013\tqED\u0001\u0003V]&$\bb\u0002)J\u0003\u0003\u0005\raQ\u0001\u0004q\u0012\n\u0004B\u0002*\u0001A\u0003&1)\u0001\u0006d_:\u001cX/\\3sc\u0001Bq\u0001\u0016\u0001A\u0002\u0013%!)A\u0005d_:\u001cX/\\3se!9a\u000b\u0001a\u0001\n\u00139\u0016!D2p]N,X.\u001a:3?\u0012*\u0017\u000f\u0006\u0002M1\"9\u0001+VA\u0001\u0002\u0004\u0019\u0005B\u0002.\u0001A\u0003&1)\u0001\u0006d_:\u001cX/\\3se\u0001Bq\u0001\u0018\u0001C\u0002\u0013%Q,A\u0003u_BL7-F\u0001_!\tyF-D\u0001a\u0015\t\t'-\u0001\u0003mC:<'\"A2\u0002\t)\fg/Y\u0005\u0003K\u0002\u0014aa\u0015;sS:<\u0007BB4\u0001A\u0003%a,\u0001\u0004u_BL7\r\t\u0005\bS\u0002\u0011\r\u0011\"\u0003(\u0003)qW/\u001c*fG>\u0014Hm\u001d\u0005\u0007W\u0002\u0001\u000b\u0011\u0002\u0015\u0002\u00179,XNU3d_J$7\u000f\t\u0005\u0006[\u0002!\tE\\\u0001\u0006g\u0016$X\u000b\u001d\u000b\u0002\u0019\")\u0001\u000f\u0001C!]\u0006AA/Z1s\t><hN\u0002\u0003s\u0001\u0001\u0019(AE\"iK\u000e\\WI\u001d:pe\u000e\u000bG\u000e\u001c2bG.\u001cB!\u001d;x5A\u0011q,^\u0005\u0003m\u0002\u0014aa\u00142kK\u000e$\bc\u0001=\u0002\u00025\t\u0011P\u0003\u0002{w\u0006A\u0001O]8ek\u000e,'O\u0003\u0002}{\u000691\r\\5f]R\u001c(BA\u0004\u007f\u0015\ty\b#\u0001\u0004ba\u0006\u001c\u0007.Z\u0005\u0004\u0003\u0007I(\u0001C\"bY2\u0014\u0017mY6\t\r\u0005\nH\u0011AA\u0004)\t\tI\u0001E\u0002\u0002\fEl\u0011\u0001\u0001\u0005\b\u0003\u001f\tH\u0011AA\t\u00031ygnQ8na2,G/[8o)\u0015a\u00151CA\u000f\u0011!\t)\"!\u0004A\u0002\u0005]\u0011\u0001C7fi\u0006$\u0017\r^1\u0011\u0007a\fI\"C\u0002\u0002\u001ce\u0014aBU3d_J$W*\u001a;bI\u0006$\u0018\r\u0003\u0005\u0002 \u00055\u0001\u0019AA\u0011\u0003%)\u0007pY3qi&|g\u000e\u0005\u0003\u0002$\u0005Mb\u0002BA\u0013\u0003_qA!a\n\u0002.5\u0011\u0011\u0011\u0006\u0006\u0004\u0003WA\u0011A\u0002\u001fs_>$h(C\u0001\u001e\u0013\r\t\t\u0004H\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\t)$a\u000e\u0003\u0013\u0015C8-\u001a9uS>t'bAA\u00199!1\u00111\b\u0001\u0005\u00029\fa\u0002^3tiN+g\u000eZ(gMN,G\u000f\u000b\u0003\u0002:\u0005}\u0002\u0003BA!\u0003\u000bj!!a\u0011\u000b\u00055\u0001\u0012\u0002BA$\u0003\u0007\u0012A\u0001V3ti\"1\u00111\n\u0001\u0005\u00029\f\u0011\u0002^3ti\u000ecwn]3)\t\u0005%\u0013q\b\u0005\u0007\u0003#\u0002A\u0011\u00018\u0002'Q,7\u000f^*f]\u0012$v\u000eU1si&$\u0018n\u001c8)\t\u0005=\u0013q\b\u0005\u0007\u0003/\u0002A\u0011\u00018\u0002'Q,7\u000f^!vi>\u001c%/Z1uKR{\u0007/[2)\t\u0005U\u0013q\b")
/* loaded from: input_file:kafka/api/test/ProducerSendTest.class */
public class ProducerSendTest extends JUnit3Suite implements KafkaServerTestHarness {
    private final int numServers;
    private final List<KafkaConfig> configs;
    private SimpleConsumer consumer1;
    private SimpleConsumer consumer2;
    private final String kafka$api$test$ProducerSendTest$$topic;
    private final int numRecords;
    private List<KafkaServer> servers;
    private String brokerList;
    private final String zkConnect;
    private EmbeddedZookeeper zookeeper;
    private ZkClient zkClient;
    private final int zkConnectionTimeout;
    private final int zkSessionTimeout;

    /* compiled from: ProducerSendTest.scala */
    /* loaded from: input_file:kafka/api/test/ProducerSendTest$CheckErrorCallback.class */
    public class CheckErrorCallback implements Callback, ScalaObject {
        public final ProducerSendTest $outer;

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc != null) {
                throw kafka$api$test$ProducerSendTest$CheckErrorCallback$$$outer().fail("Send callback returns the following exception", exc);
            }
        }

        public ProducerSendTest kafka$api$test$ProducerSendTest$CheckErrorCallback$$$outer() {
            return this.$outer;
        }

        public CheckErrorCallback(ProducerSendTest producerSendTest) {
            if (producerSendTest == null) {
                throw new NullPointerException();
            }
            this.$outer = producerSendTest;
        }
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public List<KafkaServer> servers() {
        return this.servers;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public void servers_$eq(List<KafkaServer> list) {
        this.servers = list;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public String brokerList() {
        return this.brokerList;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public void brokerList_$eq(String str) {
        this.brokerList = str;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public final void kafka$integration$KafkaServerTestHarness$$super$setUp() {
        ZooKeeperTestHarness.Cclass.setUp(this);
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public final void kafka$integration$KafkaServerTestHarness$$super$tearDown() {
        ZooKeeperTestHarness.Cclass.tearDown(this);
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public String zkConnect() {
        return this.zkConnect;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public EmbeddedZookeeper zookeeper() {
        return this.zookeeper;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void zookeeper_$eq(EmbeddedZookeeper embeddedZookeeper) {
        this.zookeeper = embeddedZookeeper;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public ZkClient zkClient() {
        return this.zkClient;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void zkClient_$eq(ZkClient zkClient) {
        this.zkClient = zkClient;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public int zkConnectionTimeout() {
        return this.zkConnectionTimeout;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public int zkSessionTimeout() {
        return this.zkSessionTimeout;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public final void kafka$zk$ZooKeeperTestHarness$$super$setUp() {
        super/*junit.framework.TestCase*/.setUp();
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public final void kafka$zk$ZooKeeperTestHarness$$super$tearDown() {
        super/*junit.framework.TestCase*/.tearDown();
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnect_$eq(String str) {
        this.zkConnect = str;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnectionTimeout_$eq(int i) {
        this.zkConnectionTimeout = i;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public void kafka$zk$ZooKeeperTestHarness$_setter_$zkSessionTimeout_$eq(int i) {
        this.zkSessionTimeout = i;
    }

    public int numServers() {
        return this.numServers;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public List<KafkaConfig> configs() {
        return this.configs;
    }

    private SimpleConsumer consumer1() {
        return this.consumer1;
    }

    private void consumer1_$eq(SimpleConsumer simpleConsumer) {
        this.consumer1 = simpleConsumer;
    }

    private SimpleConsumer consumer2() {
        return this.consumer2;
    }

    private void consumer2_$eq(SimpleConsumer simpleConsumer) {
        this.consumer2 = simpleConsumer;
    }

    public final String kafka$api$test$ProducerSendTest$$topic() {
        return this.kafka$api$test$ProducerSendTest$$topic;
    }

    private int numRecords() {
        return this.numRecords;
    }

    @Override // kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    public void setUp() {
        KafkaServerTestHarness.Cclass.setUp(this);
        consumer1_$eq(new SimpleConsumer("localhost", ((KafkaConfig) configs().apply(0)).port(), 100, 1048576, ""));
        consumer2_$eq(new SimpleConsumer("localhost", ((KafkaConfig) configs().apply(1)).port(), 100, 1048576, ""));
    }

    @Override // kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    public void tearDown() {
        consumer1().close();
        consumer2().close();
        KafkaServerTestHarness.Cclass.tearDown(this);
    }

    @Test
    public void testSendOffset() {
        ObjectRef objectRef = new ObjectRef(TestUtils$.MODULE$.createNewProducer(brokerList(), TestUtils$.MODULE$.createNewProducer$default$2(), TestUtils$.MODULE$.createNewProducer$default$3(), TestUtils$.MODULE$.createNewProducer$default$4(), TestUtils$.MODULE$.createNewProducer$default$5(), TestUtils$.MODULE$.createNewProducer$default$6()));
        CheckErrorCallback checkErrorCallback = new CheckErrorCallback(this);
        try {
            TestUtils$.MODULE$.createTopic(zkClient(), kafka$api$test$ProducerSendTest$$topic(), 1, 2, servers(), TestUtils$.MODULE$.createTopic$default$6());
            ProducerRecord producerRecord = new ProducerRecord(kafka$api$test$ProducerSendTest$$topic(), new Integer(0), "key".getBytes(), "value".getBytes());
            Assert.assertEquals("Should have offset 0", BoxesRunTime.boxToLong(0L), BoxesRunTime.boxToLong(((RecordMetadata) ((KafkaProducer) objectRef.elem).send(producerRecord, checkErrorCallback).get()).offset()));
            Assert.assertEquals("Should have offset 1", BoxesRunTime.boxToLong(1L), BoxesRunTime.boxToLong(((RecordMetadata) ((KafkaProducer) objectRef.elem).send(new ProducerRecord(kafka$api$test$ProducerSendTest$$topic(), new Integer(0), "key".getBytes(), (byte[]) null), checkErrorCallback).get()).offset()));
            Assert.assertEquals("Should have offset 2", BoxesRunTime.boxToLong(2L), BoxesRunTime.boxToLong(((RecordMetadata) ((KafkaProducer) objectRef.elem).send(new ProducerRecord(kafka$api$test$ProducerSendTest$$topic(), new Integer(0), (byte[]) null, "value".getBytes()), checkErrorCallback).get()).offset()));
            Assert.assertEquals("Should have offset 3", BoxesRunTime.boxToLong(3L), BoxesRunTime.boxToLong(((RecordMetadata) ((KafkaProducer) objectRef.elem).send(new ProducerRecord(kafka$api$test$ProducerSendTest$$topic(), (Integer) null, "key".getBytes(), "value".getBytes()), checkErrorCallback).get()).offset()));
            try {
                try {
                    ((KafkaProducer) objectRef.elem).send(new ProducerRecord((String) null, new Integer(0), "key".getBytes(), "value".getBytes()), checkErrorCallback);
                    throw fail("Should not allow sending a record without topic");
                } catch (Throwable th) {
                    throw fail("Only expecting IllegalArgumentException", th);
                }
            } catch (IllegalArgumentException e) {
                Predef$.MODULE$.intWrapper(1).to(numRecords()).foreach(new ProducerSendTest$$anonfun$testSendOffset$1(this, objectRef, producerRecord));
                Assert.assertEquals(new StringBuilder().append("Should have offset ").append(BoxesRunTime.boxToInteger(numRecords() + 4)).toString(), BoxesRunTime.boxToLong(numRecords() + 4), BoxesRunTime.boxToLong(((RecordMetadata) ((KafkaProducer) objectRef.elem).send(producerRecord, checkErrorCallback).get()).offset()));
                if (((KafkaProducer) objectRef.elem) != null) {
                    ((KafkaProducer) objectRef.elem).close();
                    objectRef.elem = null;
                }
            }
        } catch (Throwable th2) {
            if (((KafkaProducer) objectRef.elem) != null) {
                ((KafkaProducer) objectRef.elem).close();
                objectRef.elem = null;
            }
            throw th2;
        }
    }

    @Test
    public void testClose() {
        ObjectRef objectRef = new ObjectRef(TestUtils$.MODULE$.createNewProducer(brokerList(), TestUtils$.MODULE$.createNewProducer$default$2(), TestUtils$.MODULE$.createNewProducer$default$3(), TestUtils$.MODULE$.createNewProducer$default$4(), TestUtils$.MODULE$.createNewProducer$default$5(), TestUtils$.MODULE$.createNewProducer$default$6()));
        try {
            TestUtils$.MODULE$.createTopic(zkClient(), kafka$api$test$ProducerSendTest$$topic(), 1, 2, servers(), TestUtils$.MODULE$.createTopic$default$6());
            ProducerRecord producerRecord = new ProducerRecord(kafka$api$test$ProducerSendTest$$topic(), (Integer) null, "key".getBytes(), "value".getBytes());
            Predef$.MODULE$.intWrapper(1).to(numRecords()).foreach(new ProducerSendTest$$anonfun$testClose$1(this, objectRef, producerRecord));
            Future send = ((KafkaProducer) objectRef.elem).send(producerRecord);
            ((KafkaProducer) objectRef.elem).close();
            objectRef.elem = null;
            Assert.assertTrue("The last message should be acked before producer is shutdown", send.isDone());
            Assert.assertEquals(new StringBuilder().append("Should have offset ").append(BoxesRunTime.boxToInteger(numRecords())).toString(), BoxesRunTime.boxToLong(numRecords()), BoxesRunTime.boxToLong(((RecordMetadata) send.get()).offset()));
        } finally {
            if (((KafkaProducer) objectRef.elem) != null) {
                ((KafkaProducer) objectRef.elem).close();
                objectRef.elem = null;
            }
        }
    }

    @Test
    public void testSendToPartition() {
        ObjectRef objectRef = new ObjectRef(TestUtils$.MODULE$.createNewProducer(brokerList(), TestUtils$.MODULE$.createNewProducer$default$2(), TestUtils$.MODULE$.createNewProducer$default$3(), TestUtils$.MODULE$.createNewProducer$default$4(), TestUtils$.MODULE$.createNewProducer$default$5(), TestUtils$.MODULE$.createNewProducer$default$6()));
        try {
            Option option = (Option) TestUtils$.MODULE$.createTopic(zkClient(), kafka$api$test$ProducerSendTest$$topic(), 2, 2, servers(), TestUtils$.MODULE$.createTopic$default$6()).apply(BoxesRunTime.boxToInteger(1));
            Assert.assertTrue("Leader for topic \"topic\" partition 1 should exist", option.isDefined());
            List list = ((IndexedSeq) Predef$.MODULE$.intWrapper(1).to(numRecords()).map(new ProducerSendTest$$anonfun$2(this, objectRef, 1), IndexedSeq$.MODULE$.canBuildFrom())).toList();
            list.map(new ProducerSendTest$$anonfun$testSendToPartition$2(this), List$.MODULE$.canBuildFrom());
            list.foreach(new ProducerSendTest$$anonfun$testSendToPartition$3(this));
            ((LinearSeqOptimized) ((TraversableLike) list.zip(Predef$.MODULE$.intWrapper(0).until(numRecords()), List$.MODULE$.canBuildFrom())).filter(new ProducerSendTest$$anonfun$testSendToPartition$4(this))).foreach(new ProducerSendTest$$anonfun$testSendToPartition$5(this, 1));
            Buffer buffer = (BoxesRunTime.unboxToInt(option.get()) == ((KafkaConfig) configs().apply(0)).brokerId() ? consumer1().fetch(new FetchRequestBuilder().addFetch(kafka$api$test$ProducerSendTest$$topic(), 1, 0L, Integer.MAX_VALUE).build()) : consumer2().fetch(new FetchRequestBuilder().addFetch(kafka$api$test$ProducerSendTest$$topic(), 1, 0L, Integer.MAX_VALUE).build())).messageSet(kafka$api$test$ProducerSendTest$$topic(), 1).iterator().toBuffer();
            Assert.assertEquals(new StringBuilder().append("Should have fetched ").append(BoxesRunTime.boxToInteger(numRecords())).append(" messages").toString(), BoxesRunTime.boxToInteger(numRecords()), BoxesRunTime.boxToInteger(buffer.size()));
            Predef$.MODULE$.intWrapper(0).to(numRecords() - 1).foreach$mVc$sp(new ProducerSendTest$$anonfun$testSendToPartition$1(this, buffer));
        } finally {
            if (((KafkaProducer) objectRef.elem) != null) {
                ((KafkaProducer) objectRef.elem).close();
                objectRef.elem = null;
            }
        }
    }

    @Test
    public void testAutoCreateTopic() {
        KafkaProducer createNewProducer = TestUtils$.MODULE$.createNewProducer(brokerList(), TestUtils$.MODULE$.createNewProducer$default$2(), TestUtils$.MODULE$.createNewProducer$default$3(), TestUtils$.MODULE$.createNewProducer$default$4(), TestUtils$.MODULE$.createNewProducer$default$5(), 5);
        try {
            Assert.assertEquals("Should have offset 0", BoxesRunTime.boxToLong(0L), BoxesRunTime.boxToLong(((RecordMetadata) createNewProducer.send(new ProducerRecord(kafka$api$test$ProducerSendTest$$topic(), (Integer) null, "key".getBytes(), "value".getBytes())).get()).offset()));
            TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), kafka$api$test$ProducerSendTest$$topic(), 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
            if (createNewProducer != null) {
                createNewProducer.close();
            }
        } catch (Throwable th) {
            if (createNewProducer != null) {
                createNewProducer.close();
            }
            throw th;
        }
    }

    public ProducerSendTest() {
        ZooKeeperTestHarness.Cclass.$init$(this);
        KafkaServerTestHarness.Cclass.$init$(this);
        this.numServers = 2;
        this.configs = (List) TestUtils$.MODULE$.createBrokerConfigs(numServers(), false).map(new ProducerSendTest$$anonfun$1(this), List$.MODULE$.canBuildFrom());
        this.consumer1 = null;
        this.consumer2 = null;
        this.kafka$api$test$ProducerSendTest$$topic = "topic";
        this.numRecords = 100;
    }
}
