package org.galaxio.gatling.kafka.client;

import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
import io.gatling.commons.util.Clock;
import io.gatling.core.actor.ActorRef;
import io.gatling.core.actor.ActorSystem;
import io.gatling.core.stats.StatsEngine;
import io.gatling.core.util.NameGen;
import java.nio.ByteBuffer;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.scala.StreamsBuilder;
import org.apache.kafka.streams.scala.StreamsBuilder$;
import org.galaxio.gatling.kafka.Cpackage;
import org.galaxio.gatling.kafka.client.KafkaMessageTracker;
import org.galaxio.gatling.kafka.protocol.KafkaProtocol;
import org.galaxio.gatling.kafka.request.KafkaProtocolMessage;
import org.galaxio.gatling.kafka.request.KafkaProtocolMessage$;
import org.galaxio.gatling.kafka.request.KafkaSerdesImplicits;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.Option$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Statics;

/* compiled from: KafkaMessageTrackerPool.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005%x!B\r\u001b\u0011\u0003)c!B\u0014\u001b\u0011\u0003A\u0003\"B\u0018\u0002\t\u0003\u0001d\u0001B\u0019\u0002\rIB\u0001bY\u0002\u0003\u0002\u0003\u0006I\u0001\u001a\u0005\tq\u000e\u0011\t\u0011)A\u0005s\"I\u00111A\u0002\u0003\u0002\u0003\u0006I!\u001f\u0005\u000b\u0003\u000b\u0019!\u0011!Q\u0001\n\u0005\u001d\u0001BCA\u000f\u0007\t\u0005\t\u0015!\u0003\u0002 !Q\u0011qG\u0002\u0003\u0002\u0003\u0006I!!\u000f\t\r=\u001aA\u0011AA%\u0011\u001d\tYf\u0001C!\u0003;Bq!!\u001a\u0002\t\u0013\t9GB\u0003(5\t\t)\b\u0003\u0006\u0002\b6\u0011\t\u0011)A\u0005\u0003\u0013C\u0011bZ\u0007\u0003\u0002\u0003\u0006I!a$\t\u0015\u0005UUB!A!\u0002\u0013\t9\n\u0003\u0006\u000285\u0011\t\u0011)A\u0005\u0003sAaaL\u0007\u0005\u0002\u0005\r\u0006\"CAX\u001b\t\u0007I\u0011BAY\u0011!\t9-\u0004Q\u0001\n\u0005M\u0006\"CAe\u001b\t\u0007I\u0011BAf\u0011!\t\t.\u0004Q\u0001\n\u00055\u0007BB2\u000e\t\u0003\t\u0019\u000eC\u0004\u0002^6!I!a8\u0002/-\u000bgm[1NKN\u001c\u0018mZ3Ue\u0006\u001c7.\u001a:Q_>d'BA\u000e\u001d\u0003\u0019\u0019G.[3oi*\u0011QDH\u0001\u0006W\u000647.\u0019\u0006\u0003?\u0001\nqaZ1uY&twM\u0003\u0002\"E\u00059q-\u00197bq&|'\"A\u0012\u0002\u0007=\u0014xm\u0001\u0001\u0011\u0005\u0019\nQ\"\u0001\u000e\u0003/-\u000bgm[1NKN\u001c\u0018mZ3Ue\u0006\u001c7.\u001a:Q_>d7CA\u0001*!\tQS&D\u0001,\u0015\u0005a\u0013!B:dC2\f\u0017B\u0001\u0018,\u0005\u0019\te.\u001f*fM\u00061A(\u001b8jiz\"\u0012!\n\u0002\u001e\u0017\u000647.Y'fgN\fw-\u001a)s_\u000e,7o]8s'V\u0004\b\u000f\\5feN!1aM\u001eR!\t!\u0014(D\u00016\u0015\t1t'\u0001\u0003mC:<'\"\u0001\u001d\u0002\t)\fg/Y\u0005\u0003uU\u0012aa\u00142kK\u000e$\bC\u0002\u001fG\u0011\"se*D\u0001>\u0015\tqt(A\u0002ba&T!\u0001Q!\u0002\u0013A\u0014xnY3tg>\u0014(B\u0001\"D\u0003\u001d\u0019HO]3b[NT!!\b#\u000b\u0005\u0015\u0013\u0013AB1qC\u000eDW-\u0003\u0002H{\t\t\u0002K]8dKN\u001cxN]*vaBd\u0017.\u001a:\u0011\u0007)J5*\u0003\u0002KW\t)\u0011I\u001d:bsB\u0011!\u0006T\u0005\u0003\u001b.\u0012AAQ=uKB\u0011AgT\u0005\u0003!V\u0012AAV8jIB\u0011!\u000b\u0019\b\u0003'zs!\u0001V/\u000f\u0005UcfB\u0001,\\\u001d\t9&,D\u0001Y\u0015\tIF%\u0001\u0004=e>|GOP\u0005\u0002G%\u0011\u0011EI\u0005\u0003?\u0001J!!\b\u0010\n\u0005}c\u0012a\u00029bG.\fw-Z\u0005\u0003C\n\u0014AbS1gW\u0006dunZ4j]\u001eT!a\u0018\u000f\u0002\u000fQ\u0014\u0018mY6feB\u0019Q-\\8\u000e\u0003\u0019T!a\u001a5\u0002\u000b\u0005\u001cGo\u001c:\u000b\u0005%T\u0017\u0001B2pe\u0016T!aH6\u000b\u00031\f!![8\n\u000594'\u0001C!di>\u0014(+\u001a4\u0011\u0005A,hBA9t\u001d\t\u0019&/\u0003\u0002\u001c9%\u0011AOG\u0001\u0014\u0017\u000647.Y'fgN\fw-\u001a+sC\u000e\\WM]\u0005\u0003m^\u0014a\u0002\u0016:bG.,'/T3tg\u0006<WM\u0003\u0002u5\u0005Q\u0011N\u001c9viR{\u0007/[2\u0011\u0005ithBA>}!\t96&\u0003\u0002~W\u00051\u0001K]3eK\u001aL1a`A\u0001\u0005\u0019\u0019FO]5oO*\u0011QpK\u0001\f_V$\b/\u001e;U_BL7-\u0001\bnKN\u001c\u0018mZ3NCR\u001c\u0007.\u001a:\u0011\t\u0005%\u0011q\u0003\b\u0005\u0003\u0017\t\tBD\u0002T\u0003\u001bI1!a\u0004\u001d\u0003!\u0001(o\u001c;pG>d\u0017\u0002BA\n\u0003+\tQbS1gW\u0006\u0004&o\u001c;pG>d'bAA\b9%!\u0011\u0011DA\u000e\u00051Y\u0015MZ6b\u001b\u0006$8\r[3s\u0015\u0011\t\u0019\"!\u0006\u0002'I,7\u000f]8og\u0016$&/\u00198tM>\u0014X.\u001a:\u0011\u000b)\n\t#!\n\n\u0007\u0005\r2F\u0001\u0004PaRLwN\u001c\t\bU\u0005\u001d\u00121FA\u0016\u0013\r\tIc\u000b\u0002\n\rVt7\r^5p]F\u0002B!!\f\u000245\u0011\u0011q\u0006\u0006\u0004\u0003ca\u0012a\u0002:fcV,7\u000f^\u0005\u0005\u0003k\tyC\u0001\u000bLC\u001a\\\u0017\r\u0015:pi>\u001cw\u000e\\'fgN\fw-Z\u0001\u0006G2|7m\u001b\t\u0005\u0003w\t)%\u0004\u0002\u0002>)!\u0011qHA!\u0003\u0011)H/\u001b7\u000b\u0007\u0005\r#.A\u0004d_6lwN\\:\n\t\u0005\u001d\u0013Q\b\u0002\u0006\u00072|7m\u001b\u000b\u000f\u0003\u0017\ny%!\u0015\u0002T\u0005U\u0013qKA-!\r\tieA\u0007\u0002\u0003!)1M\u0003a\u0001I\")\u0001P\u0003a\u0001s\"1\u00111\u0001\u0006A\u0002eDq!!\u0002\u000b\u0001\u0004\t9\u0001C\u0004\u0002\u001e)\u0001\r!a\b\t\u000f\u0005]\"\u00021\u0001\u0002:\u0005\u0019q-\u001a;\u0015\u0005\u0005}\u0003c\u0002\u001f\u0002b!CeJT\u0005\u0004\u0003Gj$!\u0003)s_\u000e,7o]8s\u0003E\u0001(o\\2fgN|'oU;qa2LWM\u001d\u000b\u000f\u0003\u0017\nI'a\u001b\u0002n\u0005=\u0014\u0011OA:\u0011\u0015\u0019G\u00021\u0001e\u0011\u0015AH\u00021\u0001z\u0011\u0019\t\u0019\u0001\u0004a\u0001s\"9\u0011Q\u0001\u0007A\u0002\u0005\u001d\u0001bBA\u000f\u0019\u0001\u0007\u0011q\u0004\u0005\b\u0003oa\u0001\u0019AA\u001d'\u001di\u0011&UA<\u0003\u0003\u0003B!!\u001f\u0002~5\u0011\u00111\u0010\u0006\u0004\u0003\u007fA\u0017\u0002BA@\u0003w\u0012qAT1nK\u001e+g\u000e\u0005\u0003\u0002.\u0005\r\u0015\u0002BAC\u0003_\u0011AcS1gW\u0006\u001cVM\u001d3fg&k\u0007\u000f\\5dSR\u001c\u0018aD:ue\u0016\fWn]*fiRLgnZ:\u0011\u000bi\fY)_\u0015\n\t\u00055\u0015\u0011\u0001\u0002\u0004\u001b\u0006\u0004\bcA3\u0002\u0012&\u0019\u00111\u00134\u0003\u0017\u0005\u001bGo\u001c:TsN$X-\\\u0001\fgR\fGo]#oO&tW\r\u0005\u0003\u0002\u001a\u0006}UBAAN\u0015\r\ti\n[\u0001\u0006gR\fGo]\u0005\u0005\u0003C\u000bYJA\u0006Ti\u0006$8/\u00128hS:,GCCAS\u0003O\u000bI+a+\u0002.B\u0011a%\u0004\u0005\b\u0003\u000f\u0013\u0002\u0019AAE\u0011\u00199'\u00031\u0001\u0002\u0010\"9\u0011Q\u0013\nA\u0002\u0005]\u0005bBA\u001c%\u0001\u0007\u0011\u0011H\u0001\tiJ\f7m[3sgV\u0011\u00111\u0017\t\b\u0003k\u000bi,_Aa\u001b\t\t9L\u0003\u0003\u0002:\u0006m\u0016AC2p]\u000e,(O]3oi*\u0019\u0011qH\u001c\n\t\u0005}\u0016q\u0017\u0002\u0012\u0007>t7-\u001e:sK:$\b*Y:i\u001b\u0006\u0004\b\u0003B3n\u0003\u0007\u00042!!2v\u001d\t13/A\u0005ue\u0006\u001c7.\u001a:tA\u0005I\u0011m\u0019;pe:\u000bW.Z\u000b\u0003\u0003\u001b\u00042\u0001NAh\u0013\tyX'\u0001\u0006bGR|'OT1nK\u0002\"\"\"!1\u0002V\u0006]\u0017\u0011\\An\u0011\u0015Ax\u00031\u0001z\u0011\u0019\t\u0019a\u0006a\u0001s\"9\u0011QA\fA\u0002\u0005\u001d\u0001bBA\u000f/\u0001\u0007\u0011qD\u0001\u0018GJ,\u0017\r^3TiJ,\u0017-\\:Qe>\u0004XM\u001d;jKN,\"!!9\u0011\t\u0005\r\u0018Q]\u0007\u0003\u0003wKA!a:\u0002<\nQ\u0001K]8qKJ$\u0018.Z:")
/* loaded from: input_file:org/galaxio/gatling/kafka/client/KafkaMessageTrackerPool.class */
public final class KafkaMessageTrackerPool implements Cpackage.KafkaLogging, NameGen, KafkaSerdesImplicits {
    private final Map<String, Object> streamsSettings;
    private final ActorSystem actor;
    private final StatsEngine statsEngine;
    private final Clock clock;
    private final ConcurrentHashMap<String, ActorRef<KafkaMessageTracker.TrackerMessage>> trackers;
    private final String actorName;
    private Serde<GenericRecord> avroSerde;
    private Logger logger;

    /* JADX INFO: Access modifiers changed from: private */
    /* compiled from: KafkaMessageTrackerPool.scala */
    /* loaded from: input_file:org/galaxio/gatling/kafka/client/KafkaMessageTrackerPool$KafkaMessageProcessorSupplier.class */
    public static final class KafkaMessageProcessorSupplier implements ProcessorSupplier<byte[], byte[], Void, Void>, Cpackage.KafkaLogging {
        private final ActorRef<KafkaMessageTracker.TrackerMessage> tracker;
        private final String inputTopic;
        private final String outputTopic;
        private final KafkaProtocol.KafkaMatcher messageMatcher;
        private final Option<Function1<KafkaProtocolMessage, KafkaProtocolMessage>> responseTransformer;
        private final Clock clock;
        private Logger logger;

        @Override // org.galaxio.gatling.kafka.Cpackage.KafkaLogging
        public void logMessage(Function0<String> function0, KafkaProtocolMessage kafkaProtocolMessage) {
            logMessage(function0, kafkaProtocolMessage);
        }

        public Logger logger() {
            return this.logger;
        }

        public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger logger) {
            this.logger = logger;
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public Processor<byte[], byte[], Void, Void> m11get() {
            return record -> {
                Option apply = Option$.MODULE$.apply(record.headers());
                byte[] bArr = (byte[]) record.key();
                byte[] bArr2 = (byte[]) record.value();
                KafkaProtocolMessage kafkaProtocolMessage = new KafkaProtocolMessage(bArr, bArr2, this.inputTopic, this.outputTopic, apply, KafkaProtocolMessage$.MODULE$.apply$default$6());
                if (this.messageMatcher.responseMatch(kafkaProtocolMessage) == null) {
                    if (!this.logger().underlying().isErrorEnabled()) {
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        return;
                    } else {
                        this.logger().underlying().error("no messageMatcher key for read message {}", kafkaProtocolMessage.key());
                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                        return;
                    }
                }
                if (bArr == null || bArr2 == null) {
                    if (this.logger().underlying().isWarnEnabled()) {
                        this.logger().underlying().warn(" --- received message with null key or value");
                        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                    } else {
                        BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                    }
                } else if (this.logger().underlying().isTraceEnabled()) {
                    this.logger().underlying().trace(" --- received {} {}", new Object[]{bArr, bArr2});
                    BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
                } else {
                    BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
                }
                long nowMillis = this.clock.nowMillis();
                byte[] responseMatch = this.messageMatcher.responseMatch(kafkaProtocolMessage);
                String str = bArr == null ? "null" : new String(bArr);
                this.logMessage(() -> {
                    return new StringBuilder(20).append("Record received key=").append(str).toString();
                }, kafkaProtocolMessage);
                this.tracker.$bang(new KafkaMessageTracker.MessageConsumed(responseMatch, nowMillis, (KafkaProtocolMessage) this.responseTransformer.map(function1 -> {
                    return (KafkaProtocolMessage) function1.apply(kafkaProtocolMessage);
                }).getOrElse(() -> {
                    return kafkaProtocolMessage;
                })));
            };
        }

        public KafkaMessageProcessorSupplier(ActorRef<KafkaMessageTracker.TrackerMessage> actorRef, String str, String str2, KafkaProtocol.KafkaMatcher kafkaMatcher, Option<Function1<KafkaProtocolMessage, KafkaProtocolMessage>> option, Clock clock) {
            this.tracker = actorRef;
            this.inputTopic = str;
            this.outputTopic = str2;
            this.messageMatcher = kafkaMatcher;
            this.responseTransformer = option;
            this.clock = clock;
            StrictLogging.$init$(this);
            Cpackage.KafkaLogging.$init$(this);
            Statics.releaseFence();
        }
    }

    @Override // org.galaxio.gatling.kafka.request.KafkaSerdesImplicits
    public Serde<String> stringSerde() {
        Serde<String> stringSerde;
        stringSerde = stringSerde();
        return stringSerde;
    }

    @Override // org.galaxio.gatling.kafka.request.KafkaSerdesImplicits
    public Serde<Object> longSerde() {
        Serde<Object> longSerde;
        longSerde = longSerde();
        return longSerde;
    }

    @Override // org.galaxio.gatling.kafka.request.KafkaSerdesImplicits
    public Serde<Long> javaLongSerde() {
        Serde<Long> javaLongSerde;
        javaLongSerde = javaLongSerde();
        return javaLongSerde;
    }

    @Override // org.galaxio.gatling.kafka.request.KafkaSerdesImplicits
    public Serde<byte[]> byteArraySerde() {
        Serde<byte[]> byteArraySerde;
        byteArraySerde = byteArraySerde();
        return byteArraySerde;
    }

    @Override // org.galaxio.gatling.kafka.request.KafkaSerdesImplicits
    public Serde<Bytes> bytesSerde() {
        Serde<Bytes> bytesSerde;
        bytesSerde = bytesSerde();
        return bytesSerde;
    }

    @Override // org.galaxio.gatling.kafka.request.KafkaSerdesImplicits
    public Serde<ByteBuffer> byteBufferSerde() {
        Serde<ByteBuffer> byteBufferSerde;
        byteBufferSerde = byteBufferSerde();
        return byteBufferSerde;
    }

    @Override // org.galaxio.gatling.kafka.request.KafkaSerdesImplicits
    public Serde<Object> shortSerde() {
        Serde<Object> shortSerde;
        shortSerde = shortSerde();
        return shortSerde;
    }

    @Override // org.galaxio.gatling.kafka.request.KafkaSerdesImplicits
    public Serde<Short> javaShortSerde() {
        Serde<Short> javaShortSerde;
        javaShortSerde = javaShortSerde();
        return javaShortSerde;
    }

    @Override // org.galaxio.gatling.kafka.request.KafkaSerdesImplicits
    public Serde<Object> floatSerde() {
        Serde<Object> floatSerde;
        floatSerde = floatSerde();
        return floatSerde;
    }

    @Override // org.galaxio.gatling.kafka.request.KafkaSerdesImplicits
    public Serde<Float> javaFloatSerde() {
        Serde<Float> javaFloatSerde;
        javaFloatSerde = javaFloatSerde();
        return javaFloatSerde;
    }

    @Override // org.galaxio.gatling.kafka.request.KafkaSerdesImplicits
    public Serde<Object> doubleSerde() {
        Serde<Object> doubleSerde;
        doubleSerde = doubleSerde();
        return doubleSerde;
    }

    @Override // org.galaxio.gatling.kafka.request.KafkaSerdesImplicits
    public Serde<Double> javaDoubleSerde() {
        Serde<Double> javaDoubleSerde;
        javaDoubleSerde = javaDoubleSerde();
        return javaDoubleSerde;
    }

    @Override // org.galaxio.gatling.kafka.request.KafkaSerdesImplicits
    public Serde<Object> intSerde() {
        Serde<Object> intSerde;
        intSerde = intSerde();
        return intSerde;
    }

    @Override // org.galaxio.gatling.kafka.request.KafkaSerdesImplicits
    public Serde<Integer> javaIntegerSerde() {
        Serde<Integer> javaIntegerSerde;
        javaIntegerSerde = javaIntegerSerde();
        return javaIntegerSerde;
    }

    @Override // org.galaxio.gatling.kafka.request.KafkaSerdesImplicits
    public Serde<UUID> uuidSerde() {
        Serde<UUID> uuidSerde;
        uuidSerde = uuidSerde();
        return uuidSerde;
    }

    @Override // org.galaxio.gatling.kafka.request.KafkaSerdesImplicits
    public <T> WindowedSerdes.SessionWindowedSerde<T> sessionWindowedSerde(Serde<T> serde) {
        WindowedSerdes.SessionWindowedSerde<T> sessionWindowedSerde;
        sessionWindowedSerde = sessionWindowedSerde(serde);
        return sessionWindowedSerde;
    }

    @Override // org.galaxio.gatling.kafka.request.KafkaSerdesImplicits
    public <T> Serde<T> serdeClass(String str) {
        Serde<T> serdeClass;
        serdeClass = serdeClass(str);
        return serdeClass;
    }

    @Override // org.galaxio.gatling.kafka.request.KafkaSerdesImplicits
    public <K, V> Consumed<K, V> consumedFromSerde(Serde<K> serde, Serde<V> serde2) {
        Consumed<K, V> consumedFromSerde;
        consumedFromSerde = consumedFromSerde(serde, serde2);
        return consumedFromSerde;
    }

    public String genName(String str) {
        return NameGen.genName$(this, str);
    }

    @Override // org.galaxio.gatling.kafka.Cpackage.KafkaLogging
    public void logMessage(Function0<String> function0, KafkaProtocolMessage kafkaProtocolMessage) {
        logMessage(function0, kafkaProtocolMessage);
    }

    @Override // org.galaxio.gatling.kafka.request.KafkaSerdesImplicits
    public Serde<GenericRecord> avroSerde() {
        return this.avroSerde;
    }

    @Override // org.galaxio.gatling.kafka.request.KafkaSerdesImplicits
    public void org$galaxio$gatling$kafka$request$KafkaSerdesImplicits$_setter_$avroSerde_$eq(Serde<GenericRecord> serde) {
        this.avroSerde = serde;
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger logger) {
        this.logger = logger;
    }

    private ConcurrentHashMap<String, ActorRef<KafkaMessageTracker.TrackerMessage>> trackers() {
        return this.trackers;
    }

    private String actorName() {
        return this.actorName;
    }

    public ActorRef<KafkaMessageTracker.TrackerMessage> tracker(String str, String str2, KafkaProtocol.KafkaMatcher kafkaMatcher, Option<Function1<KafkaProtocolMessage, KafkaProtocolMessage>> option) {
        return trackers().computeIfAbsent(str2, str3 -> {
            if (this.logger().underlying().isDebugEnabled()) {
                this.logger().underlying().debug("Computing new tracker for topic {}, there are currently {} other trackers", new Object[]{str2, BoxesRunTime.boxToInteger(this.trackers().size())});
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
            ActorRef<KafkaMessageTracker.TrackerMessage> actorOf = this.actor.actorOf(KafkaMessageTracker$.MODULE$.actor(this.genName(this.actorName()), this.statsEngine, this.clock));
            StreamsBuilder streamsBuilder = new StreamsBuilder(StreamsBuilder$.MODULE$.$lessinit$greater$default$1());
            streamsBuilder.stream(str2, this.consumedFromSerde(this.byteArraySerde(), this.byteArraySerde())).process(KafkaMessageTrackerPool$.MODULE$.org$galaxio$gatling$kafka$client$KafkaMessageTrackerPool$$processorSupplier(actorOf, str, str2, kafkaMatcher, option, this.clock), Nil$.MODULE$);
            KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.createStreamsProperties());
            kafkaStreams.cleanUp();
            kafkaStreams.start();
            this.actor.registerOnTermination(() -> {
                if (this.logger().underlying().isDebugEnabled()) {
                    this.logger().underlying().debug("Closing stream {}", kafkaStreams);
                    BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                } else {
                    BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                }
                kafkaStreams.close();
            });
            return actorOf;
        });
    }

    private Properties createStreamsProperties() {
        Properties properties = new Properties();
        properties.putAll(CollectionConverters$.MODULE$.MapHasAsJava(this.streamsSettings).asJava());
        if (properties.get("application.id") != null) {
            String sb = new StringBuilder(0).append(properties.get("application.id").toString()).append(UUID.randomUUID().toString()).toString();
            if (logger().underlying().isDebugEnabled()) {
                logger().underlying().debug("Creating you a unique app id for streams {}", sb);
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
            properties.put("application.id", sb);
        } else {
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
        return properties;
    }

    public KafkaMessageTrackerPool(Map<String, Object> map, ActorSystem actorSystem, StatsEngine statsEngine, Clock clock) {
        this.streamsSettings = map;
        this.actor = actorSystem;
        this.statsEngine = statsEngine;
        this.clock = clock;
        StrictLogging.$init$(this);
        Cpackage.KafkaLogging.$init$(this);
        NameGen.$init$(this);
        org$galaxio$gatling$kafka$request$KafkaSerdesImplicits$_setter_$avroSerde_$eq(new GenericAvroSerde());
        this.trackers = new ConcurrentHashMap<>();
        this.actorName = "kafkaTrackerActor";
        Statics.releaseFence();
    }
}
