package kafka.server;

import java.io.File;
import java.util.Properties;
import kafka.api.FetchRequestBuilder;
import kafka.consumer.SimpleConsumer;
import kafka.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import kafka.utils.CoreUtils$;
import kafka.utils.TestUtils$;
import kafka.zk.ZooKeeperTestHarness;
import org.I0Itec.zkclient.exception.ZkException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.scalactic.source.Position;
import scala.Array$;
import scala.Option$;
import scala.Predef$;
import scala.collection.Iterable$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;

/* compiled from: ServerShutdownTest.scala */
@ScalaSignature(bytes = "\u0006\u0001A4A!\u0001\u0002\u0001\u000f\t\u00112+\u001a:wKJ\u001c\u0006.\u001e;e_^tG+Z:u\u0015\t\u0019A!\u0001\u0004tKJ4XM\u001d\u0006\u0002\u000b\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\t!\tIA\"D\u0001\u000b\u0015\tYA!\u0001\u0002{W&\u0011QB\u0003\u0002\u00155>|7*Z3qKJ$Vm\u001d;ICJtWm]:\t\u000b=\u0001A\u0011\u0001\t\u0002\rqJg.\u001b;?)\u0005\t\u0002C\u0001\n\u0001\u001b\u0005\u0011\u0001b\u0002\u000b\u0001\u0001\u0004%\t!F\u0001\u0007G>tg-[4\u0016\u0003Y\u0001\"AE\f\n\u0005a\u0011!aC&bM.\f7i\u001c8gS\u001eDqA\u0007\u0001A\u0002\u0013\u00051$\u0001\u0006d_:4\u0017nZ0%KF$\"\u0001\b\u0012\u0011\u0005u\u0001S\"\u0001\u0010\u000b\u0003}\tQa]2bY\u0006L!!\t\u0010\u0003\tUs\u0017\u000e\u001e\u0005\bGe\t\t\u00111\u0001\u0017\u0003\rAH%\r\u0005\u0007K\u0001\u0001\u000b\u0015\u0002\f\u0002\u000f\r|gNZ5hA!9q\u0005\u0001b\u0001\n\u0003A\u0013\u0001\u00025pgR,\u0012!\u000b\t\u0003U=j\u0011a\u000b\u0006\u0003Y5\nA\u0001\\1oO*\ta&\u0001\u0003kCZ\f\u0017B\u0001\u0019,\u0005\u0019\u0019FO]5oO\"1!\u0007\u0001Q\u0001\n%\nQ\u0001[8ti\u0002Bq\u0001\u000e\u0001C\u0002\u0013\u0005\u0001&A\u0003u_BL7\r\u0003\u00047\u0001\u0001\u0006I!K\u0001\u0007i>\u0004\u0018n\u0019\u0011\t\u000fa\u0002!\u0019!C\u0001s\u0005)1/\u001a8ucU\t!\bE\u0002<\u0001&j\u0011\u0001\u0010\u0006\u0003{y\n\u0011\"[7nkR\f'\r\\3\u000b\u0005}r\u0012AC2pY2,7\r^5p]&\u0011\u0011\t\u0010\u0002\u0005\u0019&\u001cH\u000f\u0003\u0004D\u0001\u0001\u0006IAO\u0001\u0007g\u0016tG/\r\u0011\t\u000f\u0015\u0003!\u0019!C\u0001s\u0005)1/\u001a8ue!1q\t\u0001Q\u0001\ni\naa]3oiJ\u0002\u0003\"B%\u0001\t\u0003R\u0015!B:fiV\u0003H#\u0001\u000f)\u0005!c\u0005CA'S\u001b\u0005q%BA(Q\u0003\u0015QWO\\5u\u0015\u0005\t\u0016aA8sO&\u00111K\u0014\u0002\u0007\u0005\u00164wN]3\t\u000bU\u0003A\u0011\u0001&\u0002#Q,7\u000f^\"mK\u0006t7\u000b[;uI><h\u000e\u000b\u0002U/B\u0011Q\nW\u0005\u00033:\u0013A\u0001V3ti\")1\f\u0001C\u0001\u0015\u00069C/Z:u\u00072,\u0017M\\*ikR$wn\u001e8XSRDG)\u001a7fi\u0016$v\u000e]5d\u000b:\f'\r\\3eQ\tQv\u000bC\u0003_\u0001\u0011\u0005!*A\u0012uKN$8\t\\3b]NCW\u000f\u001e3po:\fe\r^3s\r\u0006LG.\u001a3Ti\u0006\u0014H/\u001e9)\u0005u;\u0006BB1\u0001A\u0013%!-\u0001\fjg:{g\u000eR1f[>t7*\u00194lCRC'/Z1e)\t\u0019g\r\u0005\u0002\u001eI&\u0011QM\b\u0002\b\u0005>|G.Z1o\u0011\u00159\u0007\r1\u0001i\u0003\u0005!\bC\u0001\u0016j\u0013\tQ7F\u0001\u0004UQJ,\u0017\r\u001a\u0005\u0006Y\u0002!\tAS\u0001\u001dm\u0016\u0014\u0018NZ=O_:$\u0015-Z7p]RC'/Z1egN#\u0018\r^;t\u0011\u0015q\u0007\u0001\"\u0001K\u0003]!Xm\u001d;D_:\u001cXmY;uSZ,7\u000b[;uI><h\u000e\u000b\u0002n/\u0002")
/* loaded from: input_file:kafka/server/ServerShutdownTest.class */
public class ServerShutdownTest extends ZooKeeperTestHarness {
    private KafkaConfig config = null;
    private final String host = "localhost";
    private final String topic = "test";
    private final List<String> sent1 = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"hello", "there"}));
    private final List<String> sent2 = List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"more", "messages"}));

    public KafkaConfig config() {
        return this.config;
    }

    public void config_$eq(KafkaConfig kafkaConfig) {
        this.config = kafkaConfig;
    }

    public String host() {
        return this.host;
    }

    public String topic() {
        return this.topic;
    }

    public List<String> sent1() {
        return this.sent1;
    }

    public List<String> sent2() {
        return this.sent2;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    @Before
    public void setUp() {
        super.setUp();
        config_$eq(KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(0, zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16())));
    }

    @Test
    public void testCleanShutdown() {
        ByteBufferMessageSet byteBufferMessageSet;
        ObjectRef create = ObjectRef.create(new KafkaServer(config(), KafkaServer$.MODULE$.$lessinit$greater$default$2(), Option$.MODULE$.apply(getClass().getName()), KafkaServer$.MODULE$.$lessinit$greater$default$4()));
        ((KafkaServer) create.elem).startup();
        ObjectRef create2 = ObjectRef.create(createProducer$1((KafkaServer) create.elem));
        TestUtils$.MODULE$.createTopic(zkUtils(), topic(), 1, 1, (Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KafkaServer[]{(KafkaServer) create.elem})), TestUtils$.MODULE$.createTopic$default$6());
        ((List) sent1().map(str -> {
            return ((KafkaProducer) create2.elem).send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), str));
        }, List$.MODULE$.canBuildFrom())).foreach(future -> {
            return (RecordMetadata) future.get();
        });
        ((KafkaServer) create.elem).shutdown();
        config().logDirs().foreach(str2 -> {
            $anonfun$testCleanShutdown$3(create, str2);
            return BoxedUnit.UNIT;
        });
        ((KafkaProducer) create2.elem).close();
        create.elem = new KafkaServer(config(), KafkaServer$.MODULE$.$lessinit$greater$default$2(), KafkaServer$.MODULE$.$lessinit$greater$default$3(), KafkaServer$.MODULE$.$lessinit$greater$default$4());
        ((KafkaServer) create.elem).startup();
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KafkaServer[]{(KafkaServer) create.elem})), topic(), 0, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        create2.elem = createProducer$1((KafkaServer) create.elem);
        SimpleConsumer simpleConsumer = new SimpleConsumer(host(), TestUtils$.MODULE$.boundPort((KafkaServer) create.elem, TestUtils$.MODULE$.boundPort$default$2()), 1000000, 65536, "");
        ByteBufferMessageSet byteBufferMessageSet2 = null;
        while (true) {
            byteBufferMessageSet = byteBufferMessageSet2;
            if (byteBufferMessageSet != null && byteBufferMessageSet.validBytes() != 0) {
                break;
            } else {
                byteBufferMessageSet2 = simpleConsumer.fetch(new FetchRequestBuilder().addFetch(topic(), 0, 0L, 10000).maxWait(0).build()).messageSet(topic(), 0);
            }
        }
        Assert.assertEquals(sent1(), byteBufferMessageSet.map(messageAndOffset -> {
            return TestUtils$.MODULE$.readString(messageAndOffset.message().payload(), TestUtils$.MODULE$.readString$default$2());
        }, Iterable$.MODULE$.canBuildFrom()));
        long nextOffset = ((MessageAndOffset) byteBufferMessageSet.last()).nextOffset();
        ((List) sent2().map(str3 -> {
            return ((KafkaProducer) create2.elem).send(new ProducerRecord(this.topic(), Predef$.MODULE$.int2Integer(0), str3));
        }, List$.MODULE$.canBuildFrom())).foreach(future2 -> {
            return (RecordMetadata) future2.get();
        });
        ByteBufferMessageSet byteBufferMessageSet3 = null;
        while (true) {
            ByteBufferMessageSet byteBufferMessageSet4 = byteBufferMessageSet3;
            if (byteBufferMessageSet4 != null && byteBufferMessageSet4.validBytes() != 0) {
                Assert.assertEquals(sent2(), byteBufferMessageSet4.map(messageAndOffset2 -> {
                    return TestUtils$.MODULE$.readString(messageAndOffset2.message().payload(), TestUtils$.MODULE$.readString$default$2());
                }, Iterable$.MODULE$.canBuildFrom()));
                simpleConsumer.close();
                ((KafkaProducer) create2.elem).close();
                ((KafkaServer) create.elem).shutdown();
                CoreUtils$.MODULE$.delete(((KafkaServer) create.elem).config().logDirs());
                verifyNonDaemonThreadsStatus();
                return;
            }
            byteBufferMessageSet3 = simpleConsumer.fetch(new FetchRequestBuilder().addFetch(topic(), 0, nextOffset, 10000).build()).messageSet(topic(), 0);
        }
    }

    @Test
    public void testCleanShutdownWithDeleteTopicEnabled() {
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(0, zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16());
        createBrokerConfig.setProperty("delete.topic.enable", "true");
        KafkaServer kafkaServer = new KafkaServer(KafkaConfig$.MODULE$.fromProps(createBrokerConfig), KafkaServer$.MODULE$.$lessinit$greater$default$2(), Option$.MODULE$.apply(getClass().getName()), KafkaServer$.MODULE$.$lessinit$greater$default$4());
        kafkaServer.startup();
        kafkaServer.shutdown();
        kafkaServer.awaitShutdown();
        CoreUtils$.MODULE$.delete(kafkaServer.config().logDirs());
        verifyNonDaemonThreadsStatus();
    }

    @Test
    public void testCleanShutdownAfterFailedStartup() {
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(0, zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16());
        createBrokerConfig.setProperty("zookeeper.connect", "fakehostthatwontresolve:65535");
        KafkaServer kafkaServer = new KafkaServer(KafkaConfig$.MODULE$.fromProps(createBrokerConfig), KafkaServer$.MODULE$.$lessinit$greater$default$2(), Option$.MODULE$.apply(getClass().getName()), KafkaServer$.MODULE$.$lessinit$greater$default$4());
        try {
            try {
                kafkaServer.startup();
                throw fail("Expected KafkaServer setup to fail and throw exception", new Position("ServerShutdownTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 132));
            } catch (ZkException unused) {
                Assert.assertEquals(NotRunning$.MODULE$.state(), kafkaServer.brokerState().currentState());
                if (kafkaServer.brokerState().currentState() != NotRunning$.MODULE$.state()) {
                    kafkaServer.shutdown();
                }
                kafkaServer.awaitShutdown();
                CoreUtils$.MODULE$.delete(kafkaServer.config().logDirs());
                verifyNonDaemonThreadsStatus();
            }
        } catch (Throwable th) {
            if (kafkaServer.brokerState().currentState() != NotRunning$.MODULE$.state()) {
                kafkaServer.shutdown();
            }
            kafkaServer.awaitShutdown();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isNonDaemonKafkaThread(Thread thread) {
        return !thread.isDaemon() && thread.isAlive() && thread.getName().startsWith(getClass().getName());
    }

    public void verifyNonDaemonThreadsStatus() {
        Assert.assertEquals(0L, new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(Thread.getAllStackTraces().keySet().toArray())).map(obj -> {
            return (Thread) obj;
        }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Thread.class))))).count(thread -> {
            return BoxesRunTime.boxToBoolean(this.isNonDaemonKafkaThread(thread));
        }));
    }

    @Test
    public void testConsecutiveShutdown() {
        KafkaServer kafkaServer = new KafkaServer(config(), KafkaServer$.MODULE$.$lessinit$greater$default$2(), KafkaServer$.MODULE$.$lessinit$greater$default$3(), KafkaServer$.MODULE$.$lessinit$greater$default$4());
        kafkaServer.startup();
        kafkaServer.shutdown();
        kafkaServer.awaitShutdown();
        kafkaServer.shutdown();
        Assert.assertTrue(true);
    }

    private static final KafkaProducer createProducer$1(KafkaServer kafkaServer) {
        return TestUtils$.MODULE$.createNewProducer(TestUtils$.MODULE$.getBrokerListStrFromServers((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KafkaServer[]{kafkaServer})), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2()), TestUtils$.MODULE$.createNewProducer$default$2(), TestUtils$.MODULE$.createNewProducer$default$3(), TestUtils$.MODULE$.createNewProducer$default$4(), 5, TestUtils$.MODULE$.createNewProducer$default$6(), TestUtils$.MODULE$.createNewProducer$default$7(), TestUtils$.MODULE$.createNewProducer$default$8(), TestUtils$.MODULE$.createNewProducer$default$9(), TestUtils$.MODULE$.createNewProducer$default$10(), new IntegerSerializer(), new StringSerializer(), TestUtils$.MODULE$.createNewProducer$default$13());
    }

    public static final /* synthetic */ void $anonfun$testCleanShutdown$3(ObjectRef objectRef, String str) {
        File file = new File(str, ((KafkaServer) objectRef.elem).logManager().RecoveryPointCheckpointFile());
        Assert.assertTrue(file.exists());
        Assert.assertTrue(file.length() > 0);
    }
}
