package monix.kafka;

import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import monix.eval.Callback;
import monix.eval.Coeval;
import monix.eval.Task;
import monix.eval.Task$;
import monix.execution.Ack;
import monix.execution.Ack$Continue$;
import monix.execution.Ack$Stop$;
import monix.execution.Scheduler;
import monix.execution.cancelables.AssignableCancelable;
import monix.execution.cancelables.AssignableCancelable$;
import monix.reactive.Consumer;
import monix.reactive.Observable;
import monix.reactive.observers.Subscriber;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.List$;
import scala.concurrent.Future;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.control.NonFatal$;

/* compiled from: KafkaProducerSink.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005me\u0001B\u0001\u0003\u0005\u001d\u0011\u0011cS1gW\u0006\u0004&o\u001c3vG\u0016\u00148+\u001b8l\u0015\t\u0019A!A\u0003lC\u001a\\\u0017MC\u0001\u0006\u0003\u0015iwN\\5y\u0007\u0001)2\u0001\u0003\u0019;'\u0015\u0001\u0011bD J!\tQQ\"D\u0001\f\u0015\u0005a\u0011!B:dC2\f\u0017B\u0001\b\f\u0005\u0019\te.\u001f*fMB!\u0001cE\u000b=\u001b\u0005\t\"B\u0001\n\u0005\u0003!\u0011X-Y2uSZ,\u0017B\u0001\u000b\u0012\u0005!\u0019uN\\:v[\u0016\u0014\bc\u0001\f\u001fC9\u0011q\u0003\b\b\u00031mi\u0011!\u0007\u0006\u00035\u0019\ta\u0001\u0010:p_Rt\u0014\"\u0001\u0007\n\u0005uY\u0011a\u00029bG.\fw-Z\u0005\u0003?\u0001\u00121aU3r\u0015\ti2\u0002\u0005\u0003#Y9JT\"A\u0012\u000b\u0005\u0011*\u0013\u0001\u00039s_\u0012,8-\u001a:\u000b\u0005\u0019:\u0013aB2mS\u0016tGo\u001d\u0006\u0003\u0007!R!!\u000b\u0016\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005Y\u0013aA8sO&\u0011Qf\t\u0002\u000f!J|G-^2feJ+7m\u001c:e!\ty\u0003\u0007\u0004\u0001\u0005\u000bE\u0002!\u0019\u0001\u001a\u0003\u0003-\u000b\"a\r\u001c\u0011\u0005)!\u0014BA\u001b\f\u0005\u001dqu\u000e\u001e5j]\u001e\u0004\"AC\u001c\n\u0005aZ!aA!osB\u0011qF\u000f\u0003\u0006w\u0001\u0011\rA\r\u0002\u0002-B\u0011!\"P\u0005\u0003}-\u0011A!\u00168jiB\u0011\u0001iR\u0007\u0002\u0003*\u0011!iQ\u0001\rg\u000e\fG.\u00197pO\u001eLgn\u001a\u0006\u0003\t\u0016\u000b\u0001\u0002^=qKN\fg-\u001a\u0006\u0002\r\u0006\u00191m\\7\n\u0005!\u000b%!D*ue&\u001cG\u000fT8hO&tw\r\u0005\u0002\u000b\u0015&\u00111j\u0003\u0002\r'\u0016\u0014\u0018.\u00197ju\u0006\u0014G.\u001a\u0005\tI\u0001\u0011\t\u0011)A\u0005\u001bB\u0019a*U*\u000e\u0003=S!\u0001\u0015\u0003\u0002\t\u00154\u0018\r\\\u0005\u0003%>\u0013aaQ8fm\u0006d\u0007\u0003\u0002+V]ej\u0011AA\u0005\u0003-\n\u0011QbS1gW\u0006\u0004&o\u001c3vG\u0016\u0014\b\u0002\u0003-\u0001\u0005\u0003\u0005\u000b\u0011B-\u0002\u001fMDw.\u001e7e)\u0016\u0014X.\u001b8bi\u0016\u0004\"A\u0003.\n\u0005m[!a\u0002\"p_2,\u0017M\u001c\u0005\t;\u0002\u0011\t\u0011)A\u0005=\u0006Y\u0001/\u0019:bY2,G.[:n!\tQq,\u0003\u0002a\u0017\t\u0019\u0011J\u001c;\t\u000b\t\u0004A\u0011B2\u0002\rqJg.\u001b;?)\u0011!WMZ4\u0011\tQ\u0003a&\u000f\u0005\u0006I\u0005\u0004\r!\u0014\u0005\u00061\u0006\u0004\r!\u0017\u0005\u0006;\u0006\u0004\rA\u0018\u0005\u0006S\u0002!\tA[\u0001\u0011GJ,\u0017\r^3Tk\n\u001c8M]5cKJ$Ra[A\u0011\u0003S\u0001RA\u00037o\u0003+I!!\\\u0006\u0003\rQ+\b\u000f\\33%\ry\u0017b\u001d\u0004\u0005aF\u0004aN\u0001\u0007=e\u00164\u0017N\\3nK:$h\b\u0003\u0003sQ\u0002q\u0017aA8viB\u0019Ao^\u000b\u000e\u0003UT!A^\t\u0002\u0013=\u00147/\u001a:wKJ\u001c\u0018B\u0001=v\u0005)\u0019VOY:de&\u0014WM\u001d\u0005\bu>\u0014\r\u0011b\u0001|\u0003%\u00198\r[3ek2,'/F\u0001}!\ri\u0018\u0011A\u0007\u0002}*\u0011q\u0010B\u0001\nKb,7-\u001e;j_:L1!a\u0001\u007f\u0005%\u00196\r[3ek2,'\u000fC\u0004\u0002\b=$\t!!\u0003\u0002\u0013Q,'/\\5oCR,Gc\u0001\u001f\u0002\f!I\u0011QBA\u0003\t\u0003\u0007\u0011qB\u0001\u0003G\n\u0004BACA\ty%\u0019\u00111C\u0006\u0003\u0011q\u0012\u0017P\\1nKz\u0002B!a\u0006\u0002\u001e5\u0011\u0011\u0011\u0004\u0006\u0004\u00037q\u0018aC2b]\u000e,G.\u00192mKNLA!a\b\u0002\u001a\t!\u0012i]:jO:\f'\r\\3DC:\u001cW\r\\1cY\u0016Dq!!\u0004i\u0001\u0004\t\u0019\u0003\u0005\u0003O\u0003Ka\u0014bAA\u0014\u001f\nA1)\u00197mE\u0006\u001c7\u000e\u0003\u0004\u0002,!\u0004\r\u0001`\u0001\u0002g\u001e9\u0011q\u0006\u0002\t\u0002\u0005E\u0012!E&bM.\f\u0007K]8ek\u000e,'oU5oWB\u0019A+a\r\u0007\r\u0005\u0011\u0001\u0012AA\u001b'\u0011\t\u0019$C%\t\u000f\t\f\u0019\u0004\"\u0001\u0002:Q\u0011\u0011\u0011\u0007\u0005\t\u0003{\t\u0019\u0004\"\u0001\u0002@\u0005)\u0011\r\u001d9msV1\u0011\u0011IA%\u0003\u001b\"b!a\u0011\u0002`\u0005%DCBA#\u0003\u001f\nI\u0006\u0005\u0004U\u0001\u0005\u001d\u00131\n\t\u0004_\u0005%CAB\u0019\u0002<\t\u0007!\u0007E\u00020\u0003\u001b\"aaOA\u001e\u0005\u0004\u0011\u0004\u0002CA)\u0003w\u0001\u001d!a\u0015\u0002\u0003-\u0003R\u0001VA+\u0003\u000fJ1!a\u0016\u0003\u0005)\u0019VM]5bY&TXM\u001d\u0005\t\u00037\nY\u0004q\u0001\u0002^\u0005\ta\u000bE\u0003U\u0003+\nY\u0005\u0003\u0005\u0002b\u0005m\u0002\u0019AA2\u0003\u0019\u0019wN\u001c4jOB\u0019A+!\u001a\n\u0007\u0005\u001d$AA\nLC\u001a\\\u0017\r\u0015:pIV\u001cWM]\"p]\u001aLw\rC\u0004\u0002l\u0005m\u0002\u0019\u0001?\u0002\u0005%|\u0007\u0002CA\u001f\u0003g!\t!a\u001c\u0016\r\u0005E\u0014qOA>)\u0019\t\u0019(! \u0002\u0004B1A\u000bAA;\u0003s\u00022aLA<\t\u0019\t\u0014Q\u000eb\u0001eA\u0019q&a\u001f\u0005\rm\niG1\u00013\u0011\u001d!\u0013Q\u000ea\u0001\u0003\u007f\u0002BAT)\u0002\u0002B1A+VA;\u0003sBa!XA7\u0001\u0004q\u0006BCAD\u0003g\t\t\u0011\"\u0003\u0002\n\u0006Y!/Z1e%\u0016\u001cx\u000e\u001c<f)\t\tY\t\u0005\u0003\u0002\u000e\u0006]UBAAH\u0015\u0011\t\t*a%\u0002\t1\fgn\u001a\u0006\u0003\u0003+\u000bAA[1wC&!\u0011\u0011TAH\u0005\u0019y%M[3di\u0002")
/* loaded from: input_file:monix/kafka/KafkaProducerSink.class */
public final class KafkaProducerSink<K, V> implements Consumer<Seq<ProducerRecord<K, V>>, BoxedUnit>, StrictLogging {
    public final Coeval<KafkaProducer<K, V>> monix$kafka$KafkaProducerSink$$producer;
    public final boolean monix$kafka$KafkaProducerSink$$shouldTerminate;
    public final int monix$kafka$KafkaProducerSink$$parallelism;
    private final Logger logger;

    public Task<BoxedUnit> apply(Observable<Seq<ProducerRecord<K, V>>> observable) {
        return Consumer.apply$(this, observable);
    }

    public <In2> Consumer<In2, BoxedUnit> contramap(Function1<In2, Seq<ProducerRecord<K, V>>> function1) {
        return Consumer.contramap$(this, function1);
    }

    public <R2> Consumer<Seq<ProducerRecord<K, V>>, R2> map(Function1<BoxedUnit, R2> function1) {
        return Consumer.map$(this, function1);
    }

    public <R2> Consumer<Seq<ProducerRecord<K, V>>, R2> mapAsync(Function1<BoxedUnit, Task<R2>> function1) {
        return Consumer.mapAsync$(this, function1);
    }

    public boolean apply$mcZD$sp(double d) {
        return Function1.apply$mcZD$sp$(this, d);
    }

    public double apply$mcDD$sp(double d) {
        return Function1.apply$mcDD$sp$(this, d);
    }

    public float apply$mcFD$sp(double d) {
        return Function1.apply$mcFD$sp$(this, d);
    }

    public int apply$mcID$sp(double d) {
        return Function1.apply$mcID$sp$(this, d);
    }

    public long apply$mcJD$sp(double d) {
        return Function1.apply$mcJD$sp$(this, d);
    }

    public void apply$mcVD$sp(double d) {
        Function1.apply$mcVD$sp$(this, d);
    }

    public boolean apply$mcZF$sp(float f) {
        return Function1.apply$mcZF$sp$(this, f);
    }

    public double apply$mcDF$sp(float f) {
        return Function1.apply$mcDF$sp$(this, f);
    }

    public float apply$mcFF$sp(float f) {
        return Function1.apply$mcFF$sp$(this, f);
    }

    public int apply$mcIF$sp(float f) {
        return Function1.apply$mcIF$sp$(this, f);
    }

    public long apply$mcJF$sp(float f) {
        return Function1.apply$mcJF$sp$(this, f);
    }

    public void apply$mcVF$sp(float f) {
        Function1.apply$mcVF$sp$(this, f);
    }

    public boolean apply$mcZI$sp(int i) {
        return Function1.apply$mcZI$sp$(this, i);
    }

    public double apply$mcDI$sp(int i) {
        return Function1.apply$mcDI$sp$(this, i);
    }

    public float apply$mcFI$sp(int i) {
        return Function1.apply$mcFI$sp$(this, i);
    }

    public int apply$mcII$sp(int i) {
        return Function1.apply$mcII$sp$(this, i);
    }

    public long apply$mcJI$sp(int i) {
        return Function1.apply$mcJI$sp$(this, i);
    }

    public void apply$mcVI$sp(int i) {
        Function1.apply$mcVI$sp$(this, i);
    }

    public boolean apply$mcZJ$sp(long j) {
        return Function1.apply$mcZJ$sp$(this, j);
    }

    public double apply$mcDJ$sp(long j) {
        return Function1.apply$mcDJ$sp$(this, j);
    }

    public float apply$mcFJ$sp(long j) {
        return Function1.apply$mcFJ$sp$(this, j);
    }

    public int apply$mcIJ$sp(long j) {
        return Function1.apply$mcIJ$sp$(this, j);
    }

    public long apply$mcJJ$sp(long j) {
        return Function1.apply$mcJJ$sp$(this, j);
    }

    public void apply$mcVJ$sp(long j) {
        Function1.apply$mcVJ$sp$(this, j);
    }

    public <A> Function1<A, Task<BoxedUnit>> compose(Function1<A, Observable<Seq<ProducerRecord<K, V>>>> function1) {
        return Function1.compose$(this, function1);
    }

    public <A> Function1<Observable<Seq<ProducerRecord<K, V>>>, A> andThen(Function1<Task<BoxedUnit>, A> function1) {
        return Function1.andThen$(this, function1);
    }

    public String toString() {
        return Function1.toString$(this);
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger logger) {
        this.logger = logger;
    }

    public Tuple2<Subscriber<Seq<ProducerRecord<K, V>>>, AssignableCancelable> createSubscriber(final Callback<BoxedUnit> callback, final Scheduler scheduler) {
        return new Tuple2<>(new Subscriber<Seq<ProducerRecord<K, V>>>(this, callback, scheduler) { // from class: monix.kafka.KafkaProducerSink$$anon$1
            private final Scheduler scheduler;
            private final Coeval<KafkaProducer<K, V>> p;
            private boolean isActive;
            private final /* synthetic */ KafkaProducerSink $outer;
            private final Callback cb$1;

            public Scheduler scheduler() {
                return this.scheduler;
            }

            private Seq<Task<Option<RecordMetadata>>> sendAll(Seq<ProducerRecord<K, V>> seq) {
                return (Seq) seq.map(producerRecord -> {
                    try {
                        return ((KafkaProducer) this.p.value()).send(producerRecord);
                    } catch (Throwable th) {
                        Option unapply = NonFatal$.MODULE$.unapply(th);
                        if (unapply.isEmpty()) {
                            throw th;
                        }
                        return Task$.MODULE$.raiseError((Throwable) unapply.get());
                    }
                }, Seq$.MODULE$.canBuildFrom());
            }

            public synchronized Future<Ack> onNext(Seq<ProducerRecord<K, V>> seq) {
                Task map;
                if (!this.isActive) {
                    return Ack$Stop$.MODULE$;
                }
                if (this.$outer.monix$kafka$KafkaProducerSink$$parallelism == 1) {
                    map = Task$.MODULE$.sequence(sendAll(seq), Seq$.MODULE$.canBuildFrom());
                } else {
                    map = Task$.MODULE$.sequence(seq.sliding(this.$outer.monix$kafka$KafkaProducerSink$$parallelism, this.$outer.monix$kafka$KafkaProducerSink$$parallelism).map(seq2 -> {
                        return Task$.MODULE$.gather(this.sendAll(seq2), Seq$.MODULE$.canBuildFrom());
                    }).toList(), List$.MODULE$.canBuildFrom()).map(list -> {
                        return list.flatten(Predef$.MODULE$.$conforms());
                    });
                }
                return map.map(seq3 -> {
                    return Ack$Continue$.MODULE$;
                }).onErrorHandle(th -> {
                    if (this.$outer.logger().underlying().isErrorEnabled()) {
                        this.$outer.logger().underlying().error("Unexpected error in KafkaProducerSink", th);
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    } else {
                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    }
                    return Ack$Continue$.MODULE$;
                }).runAsync(scheduler());
            }

            public synchronized void terminate(Function0<BoxedUnit> function0) {
                if (this.isActive) {
                    this.isActive = false;
                    if (this.$outer.monix$kafka$KafkaProducerSink$$shouldTerminate) {
                        Task$.MODULE$.apply(() -> {
                            return ((KafkaProducer) this.p.value()).close();
                        }).flatten(Predef$.MODULE$.$conforms()).materialize().runAsync(scheduler()).foreach(r6 -> {
                            $anonfun$terminate$2(this, function0, r6);
                            return BoxedUnit.UNIT;
                        }, scheduler());
                    } else {
                        function0.apply$mcV$sp();
                    }
                }
            }

            public void onError(Throwable th) {
                terminate(() -> {
                    this.cb$1.onError(th);
                });
            }

            public void onComplete() {
                terminate(() -> {
                    this.cb$1.onSuccess(BoxedUnit.UNIT);
                });
            }

            public static final /* synthetic */ void $anonfun$terminate$2(KafkaProducerSink$$anon$1 kafkaProducerSink$$anon$1, Function0 function0, Try r6) {
                if (r6 instanceof Success) {
                    function0.apply$mcV$sp();
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    if (!(r6 instanceof Failure)) {
                        throw new MatchError(r6);
                    }
                    Throwable exception = ((Failure) r6).exception();
                    if (kafkaProducerSink$$anon$1.$outer.logger().underlying().isErrorEnabled()) {
                        kafkaProducerSink$$anon$1.$outer.logger().underlying().error("Unexpected error in KafkaProducerSink", exception);
                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    } else {
                        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                    }
                    function0.apply$mcV$sp();
                    BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                }
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.cb$1 = callback;
                this.scheduler = scheduler;
                this.p = this.monix$kafka$KafkaProducerSink$$producer.memoize();
                this.isActive = true;
            }
        }, AssignableCancelable$.MODULE$.dummy());
    }

    public KafkaProducerSink(Coeval<KafkaProducer<K, V>> coeval, boolean z, int i) {
        this.monix$kafka$KafkaProducerSink$$producer = coeval;
        this.monix$kafka$KafkaProducerSink$$shouldTerminate = z;
        this.monix$kafka$KafkaProducerSink$$parallelism = i;
        Function1.$init$(this);
        Consumer.$init$(this);
        StrictLogging.$init$(this);
        Predef$.MODULE$.require(i >= 1, () -> {
            return "parallelism >= 1";
        });
    }
}
