package kafka.producer;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Properties;
import junit.framework.Assert;
import kafka.admin.AdminUtils$;
import kafka.api.ProducerRequest;
import kafka.api.ProducerResponse;
import kafka.api.ProducerResponseStatus;
import kafka.common.ErrorMapping$;
import kafka.common.TopicAndPartition;
import kafka.integration.KafkaServerTestHarness;
import kafka.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.Message$;
import kafka.message.MessageSet$;
import kafka.message.NoCompressionCodec$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.SystemTime$;
import kafka.utils.TestUtils$;
import kafka.utils.TestZKUtils$;
import kafka.zk.EmbeddedZookeeper;
import kafka.zk.ZooKeeperTestHarness;
import org.I0Itec.zkclient.ZkClient;
import org.junit.Test;
import org.scalatest.junit.JUnit3Suite;
import scala.Array$;
import scala.Predef$;
import scala.ScalaObject;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Map$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.TraitSetter;

/* compiled from: SyncProducerTest.scala */
@ScalaSignature(bytes = "\u0006\u0001E4A!\u0001\u0002\u0001\u000f\t\u00012+\u001f8d!J|G-^2feR+7\u000f\u001e\u0006\u0003\u0007\u0011\t\u0001\u0002\u001d:pIV\u001cWM\u001d\u0006\u0002\u000b\u0005)1.\u00194lC\u000e\u00011\u0003\u0002\u0001\t%a\u0001\"!\u0003\t\u000e\u0003)Q!a\u0003\u0007\u0002\u000b),h.\u001b;\u000b\u00055q\u0011!C:dC2\fG/Z:u\u0015\u0005y\u0011aA8sO&\u0011\u0011C\u0003\u0002\f\u0015Vs\u0017\u000e^\u001aTk&$X\r\u0005\u0002\u0014-5\tAC\u0003\u0002\u0016\t\u0005Y\u0011N\u001c;fOJ\fG/[8o\u0013\t9BC\u0001\fLC\u001a\\\u0017mU3sm\u0016\u0014H+Z:u\u0011\u0006\u0014h.Z:t!\tIB$D\u0001\u001b\u0015\u0005Y\u0012!B:dC2\f\u0017BA\u000f\u001b\u0005-\u00196-\u00197b\u001f\nTWm\u0019;\t\u000b}\u0001A\u0011\u0001\u0011\u0002\rqJg.\u001b;?)\u0005\t\u0003C\u0001\u0012\u0001\u001b\u0005\u0011\u0001b\u0002\u0013\u0001\u0001\u0004%I!J\u0001\r[\u0016\u001c8/Y4f\u0005f$Xm]\u000b\u0002MA\u0019\u0011dJ\u0015\n\u0005!R\"!B!se\u0006L\bCA\r+\u0013\tY#D\u0001\u0003CsR,\u0007bB\u0017\u0001\u0001\u0004%IAL\u0001\u0011[\u0016\u001c8/Y4f\u0005f$Xm]0%KF$\"a\f\u001a\u0011\u0005e\u0001\u0014BA\u0019\u001b\u0005\u0011)f.\u001b;\t\u000fMb\u0013\u0011!a\u0001M\u0005\u0019\u0001\u0010J\u0019\t\rU\u0002\u0001\u0015)\u0003'\u00035iWm]:bO\u0016\u0014\u0015\u0010^3tA!9q\u0007\u0001b\u0001\n\u0003A\u0014aB2p]\u001aLwm]\u000b\u0002sA\u0019!hP!\u000e\u0003mR!\u0001P\u001f\u0002\u0013%lW.\u001e;bE2,'B\u0001 \u001b\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0003\u0001n\u0012A\u0001T5tiB\u0011!)R\u0007\u0002\u0007*\u0011A\tB\u0001\u0007g\u0016\u0014h/\u001a:\n\u0005\u0019\u001b%aC&bM.\f7i\u001c8gS\u001eDa\u0001\u0013\u0001!\u0002\u0013I\u0014\u0001C2p]\u001aLwm\u001d\u0011\t\u000f)\u0003!\u0019!C\u0001\u0017\u0006\u0001\"p\\8lK\u0016\u0004XM]\"p]:,7\r^\u000b\u0002\u0019B\u0011QJU\u0007\u0002\u001d*\u0011q\nU\u0001\u0005Y\u0006twMC\u0001R\u0003\u0011Q\u0017M^1\n\u0005Ms%AB*ue&tw\r\u0003\u0004V\u0001\u0001\u0006I\u0001T\u0001\u0012u>|7.Z3qKJ\u001cuN\u001c8fGR\u0004\u0003\"B,\u0001\t\u0003A\u0016a\u0005;fgR\u0014V-Y2iC\ndWmU3sm\u0016\u0014H#A\u0018)\u0005YS\u0006CA.^\u001b\u0005a&BA\u0006\u000f\u0013\tqFL\u0001\u0003UKN$\b\"\u00021\u0001\t\u0003A\u0016a\u0006;fgR,U\u000e\u001d;z!J|G-^2f%\u0016\fX/Z:uQ\ty&\fC\u0003d\u0001\u0011\u0005\u0001,A\fuKN$X*Z:tC\u001e,7+\u001b>f)>|G*\u0019:hK\"\u0012!M\u0017\u0005\u0006M\u0002!\t\u0001W\u0001#i\u0016\u001cH/T3tg\u0006<WmU5{KR{w\u000eT1sO\u0016<\u0016\u000e\u001e5BG.TVM]8)\u0005\u0015T\u0006\"B5\u0001\t\u0003A\u0016\u0001\n;fgR\u0004&o\u001c3vG\u0016\u001cuN\u001d:fGRd\u0017PU3dK&4Xm\u001d*fgB|gn]3)\u0005!T\u0006\"\u00027\u0001\t\u0003A\u0016A\u0006;fgR\u0004&o\u001c3vG\u0016\u00148)\u00198US6,w.\u001e;)\u0005-T\u0006\"B8\u0001\t\u0003A\u0016\u0001\t;fgR\u0004&o\u001c3vG\u0016\u0014V-];fgR<\u0016\u000e\u001e5O_J+7\u000f]8og\u0016D#A\u001c.")
/* loaded from: input_file:kafka/producer/SyncProducerTest.class */
public class SyncProducerTest extends JUnit3Suite implements KafkaServerTestHarness, ScalaObject {
    private byte[] messageBytes;
    private final List<KafkaConfig> configs;
    private final String zookeeperConnect;
    private List<KafkaServer> servers;
    private final String zkConnect;
    private EmbeddedZookeeper zookeeper;
    private ZkClient zkClient;
    private final int zkConnectionTimeout;
    private final int zkSessionTimeout;

    @Override // kafka.integration.KafkaServerTestHarness
    public /* bridge */ List<KafkaServer> servers() {
        return this.servers;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    @TraitSetter
    public /* bridge */ void servers_$eq(List<KafkaServer> list) {
        this.servers = list;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public final /* bridge */ void kafka$integration$KafkaServerTestHarness$$super$setUp() {
        ZooKeeperTestHarness.Cclass.setUp(this);
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public final /* bridge */ void kafka$integration$KafkaServerTestHarness$$super$tearDown() {
        ZooKeeperTestHarness.Cclass.tearDown(this);
    }

    @Override // kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    public /* bridge */ void setUp() {
        KafkaServerTestHarness.Cclass.setUp(this);
    }

    @Override // kafka.integration.KafkaServerTestHarness, kafka.zk.ZooKeeperTestHarness
    public /* bridge */ void tearDown() {
        KafkaServerTestHarness.Cclass.tearDown(this);
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public /* bridge */ String zkConnect() {
        return this.zkConnect;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public /* bridge */ EmbeddedZookeeper zookeeper() {
        return this.zookeeper;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public /* bridge */ void zookeeper_$eq(EmbeddedZookeeper embeddedZookeeper) {
        this.zookeeper = embeddedZookeeper;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public /* bridge */ ZkClient zkClient() {
        return this.zkClient;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public /* bridge */ void zkClient_$eq(ZkClient zkClient) {
        this.zkClient = zkClient;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public /* bridge */ int zkConnectionTimeout() {
        return this.zkConnectionTimeout;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public /* bridge */ int zkSessionTimeout() {
        return this.zkSessionTimeout;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public final /* bridge */ void kafka$zk$ZooKeeperTestHarness$$super$setUp() {
        super/*junit.framework.TestCase*/.setUp();
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public final /* bridge */ void kafka$zk$ZooKeeperTestHarness$$super$tearDown() {
        super/*junit.framework.TestCase*/.tearDown();
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public /* bridge */ void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnect_$eq(String str) {
        this.zkConnect = str;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public /* bridge */ void kafka$zk$ZooKeeperTestHarness$_setter_$zkConnectionTimeout_$eq(int i) {
        this.zkConnectionTimeout = i;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    public /* bridge */ void kafka$zk$ZooKeeperTestHarness$_setter_$zkSessionTimeout_$eq(int i) {
        this.zkSessionTimeout = i;
    }

    private byte[] messageBytes() {
        return this.messageBytes;
    }

    private void messageBytes_$eq(byte[] bArr) {
        this.messageBytes = bArr;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    public List<KafkaConfig> configs() {
        return this.configs;
    }

    public String zookeeperConnect() {
        return this.zookeeperConnect;
    }

    @Test
    public void testReachableServer() {
        SyncProducer syncProducer = new SyncProducer(new SyncProducerConfig(TestUtils$.MODULE$.getSyncProducerConfig(((KafkaServer) servers().head()).socketServer().port())));
        long milliseconds = SystemTime$.MODULE$.milliseconds();
        try {
            Assert.assertNotNull(syncProducer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(messageBytes())})), 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7())));
        } catch (Exception e) {
            Assert.fail(new StringBuilder().append("Unexpected failure sending message to broker. ").append(e.getMessage()).toString());
        }
        Assert.assertTrue(SystemTime$.MODULE$.milliseconds() - milliseconds < 500);
        long milliseconds2 = SystemTime$.MODULE$.milliseconds();
        try {
            Assert.assertNotNull(syncProducer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(messageBytes())})), 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7())));
        } catch (Exception e2) {
            Assert.fail(new StringBuilder().append("Unexpected failure sending message to broker. ").append(e2.getMessage()).toString());
        }
        Assert.assertTrue(SystemTime$.MODULE$.milliseconds() - milliseconds2 < 500);
        try {
            Assert.assertNotNull(syncProducer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(messageBytes())})), 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7())));
        } catch (Exception e3) {
            Assert.fail(new StringBuilder().append("Unexpected failure sending message to broker. ").append(e3.getMessage()).toString());
        }
    }

    @Test
    public void testEmptyProduceRequest() {
        ProducerResponse send = new SyncProducer(new SyncProducerConfig(TestUtils$.MODULE$.getSyncProducerConfig(((KafkaServer) servers().head()).socketServer().port()))).send(new ProducerRequest(0, SyncProducerConfig$.MODULE$.DefaultClientId(), (short) 1, SyncProducerConfig$.MODULE$.DefaultAckTimeoutMs(), Map$.MODULE$.apply(Nil$.MODULE$)));
        Assert.assertTrue(send != null);
        Assert.assertTrue(!send.hasError() && send.status().size() == 0);
    }

    @Test
    public void testMessageSizeTooLarge() {
        SyncProducer syncProducer = new SyncProducer(new SyncProducerConfig(TestUtils$.MODULE$.getSyncProducerConfig(((KafkaServer) servers().head()).socketServer().port())));
        AdminUtils$.MODULE$.createTopic(zkClient(), "test", 1, 1, AdminUtils$.MODULE$.createTopic$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "test", 0, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
        ProducerResponse send = syncProducer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(new byte[((KafkaConfig) configs().apply(0)).messageMaxBytes() + 1])})), 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
        Assert.assertEquals(1, send.status().count(new SyncProducerTest$$anonfun$testMessageSizeTooLarge$1(this)));
        Assert.assertEquals(ErrorMapping$.MODULE$.MessageSizeTooLargeCode(), ((ProducerResponseStatus) send.status().apply(new TopicAndPartition("test", 0))).error());
        Assert.assertEquals(-1L, ((ProducerResponseStatus) send.status().apply(new TopicAndPartition("test", 0))).offset());
        ProducerResponse send2 = syncProducer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(new byte[((((KafkaConfig) configs().apply(0)).messageMaxBytes() - Message$.MODULE$.MessageOverhead()) - MessageSet$.MODULE$.LogOverhead()) - 1])})), 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
        Assert.assertEquals(1, send.status().count(new SyncProducerTest$$anonfun$testMessageSizeTooLarge$2(this)));
        Assert.assertEquals(ErrorMapping$.MODULE$.NoError(), ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("test", 0))).error());
        Assert.assertEquals(0L, ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("test", 0))).offset());
    }

    @Test
    public void testMessageSizeTooLargeWithAckZero() {
        Properties syncProducerConfig = TestUtils$.MODULE$.getSyncProducerConfig(((KafkaServer) servers().head()).socketServer().port());
        syncProducerConfig.put("request.required.acks", "0");
        SyncProducer syncProducer = new SyncProducer(new SyncProducerConfig(syncProducerConfig));
        AdminUtils$.MODULE$.createTopic(zkClient(), "test", 1, 1, AdminUtils$.MODULE$.createTopic$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "test", 0, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
        syncProducer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(new byte[((KafkaConfig) configs().apply(0)).messageMaxBytes() + 1])})), 0, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
        try {
            syncProducer.send(TestUtils$.MODULE$.produceRequest("test", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(new byte[((KafkaConfig) configs().apply(0)).messageMaxBytes() + 1])})), 0, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7()));
        } catch (IOException e) {
        }
    }

    @Test
    public void testProduceCorrectlyReceivesResponse() {
        SyncProducer syncProducer = new SyncProducer(new SyncProducerConfig(TestUtils$.MODULE$.getSyncProducerConfig(((KafkaServer) servers().head()).socketServer().port())));
        ProducerRequest produceRequestWithAcks = TestUtils$.MODULE$.produceRequestWithAcks(Predef$.MODULE$.wrapRefArray(new String[]{"topic1", "topic2", "topic3"}), Predef$.MODULE$.wrapIntArray(Array$.MODULE$.apply(0, Predef$.MODULE$.wrapIntArray(new int[0]))), new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(messageBytes())})), 1, TestUtils$.MODULE$.produceRequestWithAcks$default$5(), TestUtils$.MODULE$.produceRequestWithAcks$default$6(), TestUtils$.MODULE$.produceRequestWithAcks$default$7());
        ProducerResponse send = syncProducer.send(produceRequestWithAcks);
        Assert.assertNotNull(send);
        Assert.assertEquals(produceRequestWithAcks.correlationId(), send.correlationId());
        Assert.assertEquals(3, send.status().size());
        send.status().values().foreach(new SyncProducerTest$$anonfun$testProduceCorrectlyReceivesResponse$1(this));
        AdminUtils$.MODULE$.createTopic(zkClient(), "topic1", 1, 1, AdminUtils$.MODULE$.createTopic$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "topic1", 0, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
        AdminUtils$.MODULE$.createTopic(zkClient(), "topic3", 1, 1, AdminUtils$.MODULE$.createTopic$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "topic3", 0, 500L, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5());
        ProducerResponse send2 = syncProducer.send(produceRequestWithAcks);
        Assert.assertNotNull(send2);
        Assert.assertEquals(produceRequestWithAcks.correlationId(), send2.correlationId());
        Assert.assertEquals(3, send2.status().size());
        Assert.assertEquals(ErrorMapping$.MODULE$.NoError(), ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("topic1", 0))).error());
        Assert.assertEquals(ErrorMapping$.MODULE$.NoError(), ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("topic3", 0))).error());
        Assert.assertEquals(0L, ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("topic1", 0))).offset());
        Assert.assertEquals(0L, ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("topic3", 0))).offset());
        Assert.assertEquals(ErrorMapping$.MODULE$.UnknownTopicOrPartitionCode(), ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("topic2", 0))).error());
        Assert.assertEquals(-1L, ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("topic2", 0))).offset());
    }

    @Test
    public void testProducerCanTimeout() {
        KafkaServer kafkaServer = (KafkaServer) servers().head();
        SyncProducer syncProducer = new SyncProducer(new SyncProducerConfig(TestUtils$.MODULE$.getSyncProducerConfig(kafkaServer.socketServer().port())));
        ProducerRequest produceRequest = TestUtils$.MODULE$.produceRequest("topic1", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(messageBytes())})), 1, TestUtils$.MODULE$.produceRequest$default$5(), TestUtils$.MODULE$.produceRequest$default$6(), TestUtils$.MODULE$.produceRequest$default$7());
        kafkaServer.requestHandlerPool().shutdown();
        long milliseconds = SystemTime$.MODULE$.milliseconds();
        try {
            syncProducer.send(produceRequest);
            Assert.fail("Should have received timeout exception since request handling is stopped.");
        } catch (SocketTimeoutException e) {
        } catch (Throwable th) {
            Assert.fail(new StringBuilder().append("Unexpected exception when expecting timeout: ").append(th).toString());
        }
        Assert.assertTrue(SystemTime$.MODULE$.milliseconds() - milliseconds >= ((long) 500));
    }

    @Test
    public void testProduceRequestWithNoResponse() {
        Assert.assertTrue(new SyncProducer(new SyncProducerConfig(TestUtils$.MODULE$.getSyncProducerConfig(((KafkaServer) servers().head()).socketServer().port()))).send(new ProducerRequest(0, SyncProducerConfig$.MODULE$.DefaultClientId(), (short) 0, SyncProducerConfig$.MODULE$.DefaultAckTimeoutMs(), Map$.MODULE$.apply(Nil$.MODULE$))) == null);
    }

    public SyncProducerTest() {
        ZooKeeperTestHarness.Cclass.$init$(this);
        servers_$eq(null);
        this.messageBytes = new byte[2];
        this.configs = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KafkaConfig[]{new KafkaConfig((Properties) TestUtils$.MODULE$.createBrokerConfigs(1).head())}));
        this.zookeeperConnect = TestZKUtils$.MODULE$.zookeeperConnect();
    }
}
