package monix.kafka;

import monix.eval.Task;
import monix.eval.Task$;
import monix.eval.Task$AsyncBuilder$;
import monix.eval.Task$AsyncBuilder$CreatePartiallyApplied$;
import monix.execution.Ack;
import monix.execution.Ack$;
import monix.execution.Ack$AckExtensions$;
import monix.execution.Ack$Stop$;
import monix.execution.Callback;
import monix.execution.Callback$;
import monix.execution.Cancelable;
import monix.execution.Scheduler;
import monix.execution.Scheduler$;
import monix.execution.cancelables.BooleanCancelable;
import monix.execution.cancelables.BooleanCancelable$;
import monix.reactive.Observable;
import monix.reactive.Observer$;
import monix.reactive.observers.Subscriber;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.JavaConverters$;
import scala.collection.TraversableLike;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.concurrent.Future$;
import scala.concurrent.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.control.NonFatal$;

/* compiled from: KafkaConsumerObservableManualCommit.scala */
@ScalaSignature(bytes = "\u0006\u0001\u001d4A!\u0003\u0006\u0003\u001f!Aq\u0006\u0001BC\u0002\u0013E\u0003\u0007\u0003\u00055\u0001\t\u0005\t\u0015!\u00032\u0011!)\u0004A!b\u0001\n#2\u0004\u0002C%\u0001\u0005\u0003\u0005\u000b\u0011B\u001c\t\r)\u0003A\u0011\u0001\u0006L\u0011\u001dy\u0005A1A\u0005\nACa\u0001\u0016\u0001!\u0002\u0013\t\u0006\"B+\u0001\t#2&aI&bM.\f7i\u001c8tk6,'o\u00142tKJ4\u0018M\u00197f\u001b\u0006tW/\u00197D_6l\u0017\u000e\u001e\u0006\u0003\u00171\tQa[1gW\u0006T\u0011!D\u0001\u0006[>t\u0017\u000e_\u0002\u0001+\r\u0001RDK\n\u0004\u0001Ea\u0003c\u0001\n\u0016/5\t1C\u0003\u0002\u0015\u0019\u0005A!/Z1di&4X-\u0003\u0002\u0017'\tQqJY:feZ\f'\r\\3\u0011\taI2$K\u0007\u0002\u0015%\u0011!D\u0003\u0002\u0013\u0007>lW.\u001b;uC\ndW-T3tg\u0006<W\r\u0005\u0002\u001d;1\u0001A!\u0002\u0010\u0001\u0005\u0004y\"!A&\u0012\u0005\u00012\u0003CA\u0011%\u001b\u0005\u0011#\"A\u0012\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0015\u0012#a\u0002(pi\"Lgn\u001a\t\u0003C\u001dJ!\u0001\u000b\u0012\u0003\u0007\u0005s\u0017\u0010\u0005\u0002\u001dU\u0011)1\u0006\u0001b\u0001?\t\ta\u000bE\u0003\u0019[mIs#\u0003\u0002/\u0015\t92*\u00194lC\u000e{gn];nKJ|%m]3sm\u0006\u0014G.Z\u0001\u0007G>tg-[4\u0016\u0003E\u0002\"\u0001\u0007\u001a\n\u0005MR!aE&bM.\f7i\u001c8tk6,'oQ8oM&<\u0017aB2p]\u001aLw\rI\u0001\tG>t7/^7feV\tq\u0007E\u00029wuj\u0011!\u000f\u0006\u0003u1\tA!\u001a<bY&\u0011A(\u000f\u0002\u0005)\u0006\u001c8\u000e\u0005\u0003?\u000fnIS\"A \u000b\u0005U\u0002%BA!C\u0003\u001d\u0019G.[3oiNT!aC\"\u000b\u0005\u0011+\u0015AB1qC\u000eDWMC\u0001G\u0003\ry'oZ\u0005\u0003\u0011~\u0012QbS1gW\u0006\u001cuN\\:v[\u0016\u0014\u0018!C2p]N,X.\u001a:!\u0003\u0019a\u0014N\\5u}Q\u0019A*\u0014(\u0011\ta\u00011$\u000b\u0005\u0006_\u0015\u0001\r!\r\u0005\u0006k\u0015\u0001\raN\u0001\u0012a>dG\u000eV5nK>,H/T5mY&\u001cX#A)\u0011\u0005\u0005\u0012\u0016BA*#\u0005\u0011auN\\4\u0002%A|G\u000e\u001c+j[\u0016|W\u000f^'jY2L7\u000fI\u0001\bC\u000e\\G+Y:l)\r9fl\u0018\t\u0004qmB\u0006CA-]\u001b\u0005Q&BA.\r\u0003%)\u00070Z2vi&|g.\u0003\u0002^5\n\u0019\u0011iY6\t\u000bUB\u0001\u0019A\u001f\t\u000b\u0001D\u0001\u0019A1\u0002\u0007=,H\u000fE\u0002cK^i\u0011a\u0019\u0006\u0003IN\t\u0011b\u001c2tKJ4XM]:\n\u0005\u0019\u001c'AC*vEN\u001c'/\u001b2fe\u0002")
/* loaded from: input_file:monix/kafka/KafkaConsumerObservableManualCommit.class */
public final class KafkaConsumerObservableManualCommit<K, V> extends Observable<CommittableMessage<K, V>> implements KafkaConsumerObservable<K, V, CommittableMessage<K, V>> {
    private final KafkaConsumerConfig config;
    private final Task<KafkaConsumer<K, V>> consumer;
    private final long pollTimeoutMillis;

    @Override // monix.kafka.KafkaConsumerObservable
    public final Cancelable unsafeSubscribeFn(Subscriber<CommittableMessage<K, V>> subscriber) {
        Cancelable unsafeSubscribeFn;
        unsafeSubscribeFn = unsafeSubscribeFn(subscriber);
        return unsafeSubscribeFn;
    }

    @Override // monix.kafka.KafkaConsumerObservable
    public KafkaConsumerConfig config() {
        return this.config;
    }

    @Override // monix.kafka.KafkaConsumerObservable
    public Task<KafkaConsumer<K, V>> consumer() {
        return this.consumer;
    }

    private long pollTimeoutMillis() {
        return this.pollTimeoutMillis;
    }

    @Override // monix.kafka.KafkaConsumerObservable
    public Task<Ack> ackTask(KafkaConsumer<K, V> kafkaConsumer, Subscriber<CommittableMessage<K, V>> subscriber) {
        return Task$AsyncBuilder$CreatePartiallyApplied$.MODULE$.apply$extension(Task$.MODULE$.create(), (scheduler, callback) -> {
            Callback forked = Callback$.MODULE$.forked(callback, scheduler);
            BooleanCancelable apply = BooleanCancelable$.MODULE$.apply();
            final KafkaConsumerObservableManualCommit kafkaConsumerObservableManualCommit = null;
            Commit commit = new Commit(kafkaConsumerObservableManualCommit, kafkaConsumer) { // from class: monix.kafka.KafkaConsumerObservableManualCommit$$anon$1
                private final KafkaConsumer consumer$1;

                @Override // monix.kafka.Commit
                public final Task<BoxedUnit> commitBatchAsync(Map<TopicPartition, Object> map) {
                    Task<BoxedUnit> commitBatchAsync;
                    commitBatchAsync = commitBatchAsync(map);
                    return commitBatchAsync;
                }

                @Override // monix.kafka.Commit
                public Task<BoxedUnit> commitBatchSync(Map<TopicPartition, Object> map) {
                    return Task$.MODULE$.apply(() -> {
                        package$.MODULE$.blocking(() -> {
                            KafkaConsumer kafkaConsumer2 = this.consumer$1;
                            synchronized (kafkaConsumer2) {
                                this.consumer$1.commitSync((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter((scala.collection.Map) map.map(tuple2 -> {
                                    if (tuple2 == null) {
                                        throw new MatchError(tuple2);
                                    }
                                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((TopicPartition) tuple2._1()), new OffsetAndMetadata(tuple2._2$mcJ$sp()));
                                }, Map$.MODULE$.canBuildFrom())).asJava());
                            }
                        });
                    });
                }

                @Override // monix.kafka.Commit
                public Task<BoxedUnit> commitBatchAsync(Map<TopicPartition, Object> map, OffsetCommitCallback offsetCommitCallback) {
                    return Task$.MODULE$.apply(() -> {
                        package$.MODULE$.blocking(() -> {
                            KafkaConsumer kafkaConsumer2 = this.consumer$1;
                            synchronized (kafkaConsumer2) {
                                this.consumer$1.commitAsync((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter((scala.collection.Map) map.map(tuple2 -> {
                                    if (tuple2 == null) {
                                        throw new MatchError(tuple2);
                                    }
                                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((TopicPartition) tuple2._1()), new OffsetAndMetadata(tuple2._2$mcJ$sp()));
                                }, Map$.MODULE$.canBuildFrom())).asJava(), offsetCommitCallback);
                            }
                        });
                    });
                }

                {
                    this.consumer$1 = kafkaConsumer;
                    Commit.$init$(this);
                }
            };
            new Scheduler.Extensions(Scheduler$.MODULE$.Extensions(scheduler)).executeAsync(() -> {
                Ack$Stop$ failed;
                Ack$Stop$ feed;
                Ack$Stop$ ack$Stop$;
                try {
                    synchronized (kafkaConsumer) {
                        if (apply.isCanceled()) {
                            feed = Ack$Stop$.MODULE$;
                        } else {
                            feed = Observer$.MODULE$.feed(subscriber, (Iterable) ((TraversableLike) JavaConverters$.MODULE$.iterableAsScalaIterableConverter((ConsumerRecords) package$.MODULE$.blocking(() -> {
                                return kafkaConsumer.poll(this.pollTimeoutMillis());
                            })).asScala()).map(consumerRecord -> {
                                return new CommittableMessage(consumerRecord, CommittableOffset$.MODULE$.apply(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset() + 1, commit));
                            }, Iterable$.MODULE$.canBuildFrom()), subscriber.scheduler());
                        }
                        ack$Stop$ = feed;
                    }
                    failed = ack$Stop$;
                } catch (Throwable th) {
                    Option unapply = NonFatal$.MODULE$.unapply(th);
                    if (unapply.isEmpty()) {
                        throw th;
                    }
                    failed = Future$.MODULE$.failed((Throwable) unapply.get());
                }
                Ack$AckExtensions$.MODULE$.syncOnComplete$extension(Ack$.MODULE$.AckExtensions(failed), r8 -> {
                    $anonfun$ackTask$5(apply, forked, scheduler, r8);
                    return BoxedUnit.UNIT;
                }, scheduler);
            });
            return apply;
        }, Task$AsyncBuilder$.MODULE$.forCancelable());
    }

    public static final /* synthetic */ void $anonfun$ackTask$5(BooleanCancelable booleanCancelable, Callback callback, Scheduler scheduler, Try r7) {
        BoxedUnit boxedUnit;
        BoxedUnit boxedUnit2;
        if (!(r7 instanceof Success)) {
            if (!(r7 instanceof Failure)) {
                throw new MatchError(r7);
            }
            callback.onError(((Failure) r7).exception());
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            return;
        }
        Ack ack = (Ack) ((Success) r7).value();
        try {
            if (booleanCancelable.isCanceled()) {
                callback.onSuccess(Ack$Stop$.MODULE$);
                boxedUnit2 = BoxedUnit.UNIT;
            } else {
                callback.onSuccess(ack);
                boxedUnit2 = BoxedUnit.UNIT;
            }
        } catch (Throwable th) {
            Option unapply = NonFatal$.MODULE$.unapply(th);
            if (unapply.isEmpty()) {
                throw th;
            }
            Throwable th2 = (Throwable) unapply.get();
            if (1 != 0) {
                callback.onError(th2);
                boxedUnit = BoxedUnit.UNIT;
            } else {
                scheduler.reportFailure(th2);
                boxedUnit = BoxedUnit.UNIT;
            }
            boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public KafkaConsumerObservableManualCommit(KafkaConsumerConfig kafkaConsumerConfig, Task<KafkaConsumer<K, V>> task) {
        this.config = kafkaConsumerConfig;
        this.consumer = task;
        KafkaConsumerObservable.$init$(this);
        this.pollTimeoutMillis = kafkaConsumerConfig.fetchMaxWaitTime().toMillis();
    }
}
