package net.liftmodules.kafkaactors;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import net.liftweb.common.Loggable;
import net.liftweb.common.Logger;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.collection.Iterable;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

/* compiled from: KafkaActorConsumingThread.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005eb!B\u0001\u0003\u0001\tA!!G&bM.\f\u0017i\u0019;pe\u000e{gn];nS:<G\u000b\u001b:fC\u0012T!a\u0001\u0003\u0002\u0017-\fgm[1bGR|'o\u001d\u0006\u0003\u000b\u0019\t1\u0002\\5gi6|G-\u001e7fg*\tq!A\u0002oKR\u001c2\u0001A\u0005\u0012!\tQq\"D\u0001\f\u0015\taQ\"\u0001\u0003mC:<'\"\u0001\b\u0002\t)\fg/Y\u0005\u0003!-\u0011a\u0001\u00165sK\u0006$\u0007C\u0001\n\u0018\u001b\u0005\u0019\"B\u0001\u000b\u0016\u0003\u0019\u0019w.\\7p]*\u0011aCB\u0001\bY&4Go^3c\u0013\tA2C\u0001\u0005M_\u001e<\u0017M\u00197f\u0011!Q\u0002A!A!\u0002\u0013a\u0012\u0001\u00028b[\u0016\u001c\u0001\u0001\u0005\u0002\u001eG9\u0011a$I\u0007\u0002?)\t\u0001%A\u0003tG\u0006d\u0017-\u0003\u0002#?\u00051\u0001K]3eK\u001aL!\u0001J\u0013\u0003\rM#(/\u001b8h\u0015\t\u0011s\u0004\u0003\u0005(\u0001\t\u0005\t\u0015!\u0003)\u0003)\u0019wN\\:v[\u0016\u0014hI\u001c\t\u0004=%Z\u0013B\u0001\u0016 \u0005%1UO\\2uS>t\u0007\u0007\u0005\u0003-oezT\"A\u0017\u000b\u00059z\u0013\u0001C2p]N,X.\u001a:\u000b\u0005A\n\u0014aB2mS\u0016tGo\u001d\u0006\u0003eM\nQa[1gW\u0006T!\u0001N\u001b\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u00051\u0014aA8sO&\u0011\u0001(\f\u0002\u000e\u0017\u000647.Y\"p]N,X.\u001a:\u0011\u0007yQD(\u0003\u0002<?\t)\u0011I\u001d:bsB\u0011a$P\u0005\u0003}}\u0011AAQ=uKB\u0011\u0001)Q\u0007\u0002\u0005%\u0011!I\u0001\u0002\u0015\u0017\u000647.Y'fgN\fw-Z#om\u0016dw\u000e]3\t\u0011\u0011\u0003!\u0011!Q\u0001\n\u0015\u000b1\u0002]1sK:$\u0018i\u0019;peB\u0011\u0001IR\u0005\u0003\u000f\n\u0011!bS1gW\u0006\f5\r^8s\u0011!I\u0005A!A!\u0002\u0013Q\u0015\u0001\u00039pY2$\u0016.\\3\u0011\u0005yY\u0015B\u0001' \u0005\u0011auN\\4\t\u000b9\u0003A\u0011A(\u0002\rqJg.\u001b;?)\u0015\u0001\u0016KU*U!\t\u0001\u0005\u0001C\u0003\u001b\u001b\u0002\u0007A\u0004C\u0003(\u001b\u0002\u0007\u0001\u0006C\u0003E\u001b\u0002\u0007Q\tC\u0003J\u001b\u0002\u0007!\nC\u0004W\u0001\t\u0007I\u0011B,\u0002\r\rdwn]3e+\u0005A\u0006CA-a\u001b\u0005Q&BA.]\u0003\u0019\tGo\\7jG*\u0011QLX\u0001\u000bG>t7-\u001e:sK:$(BA0\u000e\u0003\u0011)H/\u001b7\n\u0005\u0005T&!D!u_6L7MQ8pY\u0016\fg\u000e\u0003\u0004d\u0001\u0001\u0006I\u0001W\u0001\bG2|7/\u001a3!\u000f\u0015)\u0007\u0001#\u0003g\u0003I\u0001VM\u001c3j]\u001e|eMZ:fiNdunY6\u0011\u0005\u001dDW\"\u0001\u0001\u0007\u000b%\u0004\u0001\u0012\u00026\u0003%A+g\u000eZ5oO>3gm]3ug2{7m[\n\u0003Q.\u0004\"A\b7\n\u00055|\"AB!osJ+g\rC\u0003OQ\u0012\u0005q\u000eF\u0001g\u0011\u0019\t\b\u0001)Q\u0005e\u0006\u0019\u0002/\u001a8eS:<wJ\u001a4tKR\u001cu.\\7jiB!1\u000f\u001e<|\u001b\u0005q\u0016BA;_\u0005\ri\u0015\r\u001d\t\u0003ofl\u0011\u0001\u001f\u0006\u0003)EJ!A\u001f=\u0003\u001dQ{\u0007/[2QCJ$\u0018\u000e^5p]B\u0011A\u0006`\u0005\u0003{6\u0012\u0011c\u00144gg\u0016$\u0018I\u001c3NKR\fG-\u0019;b\u0011\u0019q\u0003\u0001)Q\u0005\u007fB!a$!\u0001,\u0013\r\t\u0019a\b\u0002\u0007\u001fB$\u0018n\u001c8\t\u000f\u0005\u001d\u0001\u0001\"\u0011\u0002\n\u0005\u0019!/\u001e8\u0015\u0005\u0005-\u0001c\u0001\u0010\u0002\u000e%\u0019\u0011qB\u0010\u0003\tUs\u0017\u000e\u001e\u0005\b\u0003'\u0001A\u0011AA\u0005\u0003!\u0019\b.\u001e;e_^t\u0007bBA\f\u0001\u0011\u0005\u0011\u0011D\u0001\u0012C\u0012$\u0007+\u001a8eS:<wJ\u001a4tKR\u001cH\u0003BA\u0006\u00037Aq!!\b\u0002\u0016\u0001\u0007!/\u0001\u0006oK^|eMZ:fiND\u0001\"!\t\u0001A\u0003%\u00111E\u0001\u0015_\u001a47/\u001a;D_6l\u0017\u000e^\"bY2\u0014\u0017mY6\u0013\r\u0005\u0015\u0012\u0011FA\u0018\r\u001d\t9#a\b\u0001\u0003G\u0011A\u0002\u0010:fM&tW-\\3oiz\u00022ACA\u0016\u0013\r\tic\u0003\u0002\u0007\u001f\nTWm\u0019;\u0011\u00071\n\t$C\u0002\u000245\u0012Ac\u00144gg\u0016$8i\\7nSR\u001c\u0015\r\u001c7cC\u000e\\\u0007\u0002CA\u001c\u0001\u0001&I!!\u0003\u0002/\r|W.\\5u\u0003:L\b+\u001a8eS:<wJ\u001a4tKR\u001c\b")
/* loaded from: input_file:net/liftmodules/kafkaactors/KafkaActorConsumingThread.class */
public class KafkaActorConsumingThread extends Thread implements Loggable {
    private final Function0<KafkaConsumer<byte[], KafkaMessageEnvelope>> consumerFn;
    public final KafkaActor net$liftmodules$kafkaactors$KafkaActorConsumingThread$$parentActor;
    private final long pollTime;
    private final AtomicBoolean closed;
    public Map<TopicPartition, OffsetAndMetadata> net$liftmodules$kafkaactors$KafkaActorConsumingThread$$pendingOffsetCommit;
    private Option<KafkaConsumer<byte[], KafkaMessageEnvelope>> consumer;
    public final OffsetCommitCallback net$liftmodules$kafkaactors$KafkaActorConsumingThread$$offsetCommitCallback;
    private volatile KafkaActorConsumingThread$PendingOffsetsLock$ PendingOffsetsLock$module;
    private final transient Logger logger;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    /* JADX WARN: Type inference failed for: r1v2, types: [net.liftmodules.kafkaactors.KafkaActorConsumingThread$PendingOffsetsLock$] */
    private KafkaActorConsumingThread$PendingOffsetsLock$ net$liftmodules$kafkaactors$KafkaActorConsumingThread$$PendingOffsetsLock$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (this.PendingOffsetsLock$module == null) {
                this.PendingOffsetsLock$module = new Object(this) { // from class: net.liftmodules.kafkaactors.KafkaActorConsumingThread$PendingOffsetsLock$
                };
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.PendingOffsetsLock$module;
        }
    }

    public Logger logger() {
        return this.logger;
    }

    public void net$liftweb$common$Loggable$_setter_$logger_$eq(Logger logger) {
        this.logger = logger;
    }

    private AtomicBoolean closed() {
        return this.closed;
    }

    public KafkaActorConsumingThread$PendingOffsetsLock$ net$liftmodules$kafkaactors$KafkaActorConsumingThread$$PendingOffsetsLock() {
        return this.PendingOffsetsLock$module == null ? net$liftmodules$kafkaactors$KafkaActorConsumingThread$$PendingOffsetsLock$lzycompute() : this.PendingOffsetsLock$module;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        closed().set(false);
        this.consumer = new Some(this.consumerFn.apply());
        while (!closed().get()) {
            try {
                if (this.consumer.isEmpty()) {
                    throw new IllegalStateException("Consumer has somehow become None. Aborting consumption.");
                }
                commitAnyPendingOffsets();
                Some some = this.consumer;
                if (!(some instanceof Some)) {
                    if (!None$.MODULE$.equals(some)) {
                        throw new MatchError(some);
                    }
                    throw new IllegalStateException("Consumer has somehow become None. Aborting consumption.");
                }
                Iterable iterable = (Iterable) JavaConverters$.MODULE$.iterableAsScalaIterableConverter(((KafkaConsumer) some.x()).poll(this.pollTime)).asScala();
                HashMap hashMap = new HashMap();
                iterable.foreach(new KafkaActorConsumingThread$$anonfun$run$1(this, hashMap));
                if (iterable.nonEmpty()) {
                    this.net$liftmodules$kafkaactors$KafkaActorConsumingThread$$parentActor.$bang(new CommitOffsets(hashMap));
                }
            } catch (WakeupException e) {
                if (!closed().get()) {
                    throw e;
                }
                this.consumer.foreach(new KafkaActorConsumingThread$$anonfun$run$2(this));
                this.consumer = None$.MODULE$;
                return;
            }
        }
    }

    public void shutdown() {
        closed().set(true);
        this.consumer.foreach(new KafkaActorConsumingThread$$anonfun$shutdown$1(this));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [net.liftmodules.kafkaactors.KafkaActorConsumingThread$PendingOffsetsLock$] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8 */
    public void addPendingOffsets(Map<TopicPartition, OffsetAndMetadata> map) {
        ?? net$liftmodules$kafkaactors$KafkaActorConsumingThread$$PendingOffsetsLock = net$liftmodules$kafkaactors$KafkaActorConsumingThread$$PendingOffsetsLock();
        synchronized (net$liftmodules$kafkaactors$KafkaActorConsumingThread$$PendingOffsetsLock) {
            ((IterableLike) JavaConverters$.MODULE$.mapAsScalaMapConverter(map).asScala()).foreach(new KafkaActorConsumingThread$$anonfun$addPendingOffsets$1(this));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            net$liftmodules$kafkaactors$KafkaActorConsumingThread$$PendingOffsetsLock = net$liftmodules$kafkaactors$KafkaActorConsumingThread$$PendingOffsetsLock;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [net.liftmodules.kafkaactors.KafkaActorConsumingThread$PendingOffsetsLock$] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v9 */
    private void commitAnyPendingOffsets() {
        ?? net$liftmodules$kafkaactors$KafkaActorConsumingThread$$PendingOffsetsLock = net$liftmodules$kafkaactors$KafkaActorConsumingThread$$PendingOffsetsLock();
        synchronized (net$liftmodules$kafkaactors$KafkaActorConsumingThread$$PendingOffsetsLock) {
            if (this.net$liftmodules$kafkaactors$KafkaActorConsumingThread$$pendingOffsetCommit.isEmpty()) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                this.consumer.foreach(new KafkaActorConsumingThread$$anonfun$commitAnyPendingOffsets$1(this));
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
            net$liftmodules$kafkaactors$KafkaActorConsumingThread$$PendingOffsetsLock = net$liftmodules$kafkaactors$KafkaActorConsumingThread$$PendingOffsetsLock;
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public KafkaActorConsumingThread(String str, Function0<KafkaConsumer<byte[], KafkaMessageEnvelope>> function0, KafkaActor kafkaActor, long j) {
        super(str);
        this.consumerFn = function0;
        this.net$liftmodules$kafkaactors$KafkaActorConsumingThread$$parentActor = kafkaActor;
        this.pollTime = j;
        Loggable.class.$init$(this);
        this.closed = new AtomicBoolean(false);
        this.net$liftmodules$kafkaactors$KafkaActorConsumingThread$$pendingOffsetCommit = new HashMap();
        this.consumer = None$.MODULE$;
        this.net$liftmodules$kafkaactors$KafkaActorConsumingThread$$offsetCommitCallback = new OffsetCommitCallback(this) { // from class: net.liftmodules.kafkaactors.KafkaActorConsumingThread$$anon$1
            private final /* synthetic */ KafkaActorConsumingThread $outer;

            /* JADX WARN: Multi-variable type inference failed */
            /* JADX WARN: Type inference failed for: r0v12 */
            /* JADX WARN: Type inference failed for: r0v6, types: [net.liftmodules.kafkaactors.KafkaActorConsumingThread$PendingOffsetsLock$] */
            /* JADX WARN: Type inference failed for: r0v7, types: [java.lang.Throwable] */
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
                if (exc == null) {
                    this.$outer.logger().debug(new KafkaActorConsumingThread$$anon$1$$anonfun$onComplete$2(this));
                } else {
                    this.$outer.logger().error(new KafkaActorConsumingThread$$anon$1$$anonfun$onComplete$1(this), exc);
                }
                ?? net$liftmodules$kafkaactors$KafkaActorConsumingThread$$PendingOffsetsLock = this.$outer.net$liftmodules$kafkaactors$KafkaActorConsumingThread$$PendingOffsetsLock();
                synchronized (net$liftmodules$kafkaactors$KafkaActorConsumingThread$$PendingOffsetsLock) {
                    this.$outer.net$liftmodules$kafkaactors$KafkaActorConsumingThread$$pendingOffsetCommit.clear();
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    net$liftmodules$kafkaactors$KafkaActorConsumingThread$$PendingOffsetsLock = net$liftmodules$kafkaactors$KafkaActorConsumingThread$$PendingOffsetsLock;
                }
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
            }
        };
    }
}
