package io.laserdisc.mysql.binlog.stream;

import cats.effect.IO;
import cats.effect.IO$;
import cats.effect.LiftIO;
import cats.effect.LiftIO$;
import cats.effect.kernel.Async;
import cats.effect.std.Dispatcher$;
import cats.effect.std.Queue;
import cats.effect.std.Queue$;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import fs2.Stream;
import fs2.Stream$;
import fs2.compat.NotGiven$;
import org.typelevel.log4cats.Logger;
import scala.MatchError;
import scala.Tuple3;

/* compiled from: MysqlBinlogStream.scala */
/* loaded from: input_file:io/laserdisc/mysql/binlog/stream/MysqlBinlogStream$.class */
public final class MysqlBinlogStream$ {
    public static final MysqlBinlogStream$ MODULE$ = new MysqlBinlogStream$();

    public <F> Stream<F, Event> rawEvents(BinaryLogClient binaryLogClient, Async<F> async, Logger<F> logger, LiftIO<F> liftIO) {
        return Stream$.MODULE$.resource(Dispatcher$.MODULE$.parallel(async), async).flatMap(dispatcher -> {
            return Stream$.MODULE$.eval(Queue$.MODULE$.bounded(10000, async)).map(queue -> {
                MysSqlBinlogEventProcessor mysSqlBinlogEventProcessor = new MysSqlBinlogEventProcessor(binaryLogClient, queue, dispatcher, async, logger);
                return new Tuple3(queue, mysSqlBinlogEventProcessor, Stream$.MODULE$.eval(LiftIO$.MODULE$.apply(liftIO).liftIO(IO$.MODULE$.delay(() -> {
                    mysSqlBinlogEventProcessor.run();
                }).start().flatMap(fiber -> {
                    return (IO) fiber.joinWithNever(IO$.MODULE$.asyncForIO());
                }))));
            }).flatMap(tuple3 -> {
                if (tuple3 == null) {
                    throw new MatchError(tuple3);
                }
                Queue queue2 = (Queue) tuple3._1();
                return Stream$.MODULE$.fromQueueNoneTerminated(queue2, Stream$.MODULE$.fromQueueNoneTerminated$default$2(), async).concurrently((Stream) tuple3._3(), async).onFinalize(cats.effect.package$.MODULE$.Async().apply(async).delay(() -> {
                    binaryLogClient.disconnect();
                }), async).map(event -> {
                    return event;
                });
            }, NotGiven$.MODULE$.default());
        }, NotGiven$.MODULE$.default());
    }

    private MysqlBinlogStream$() {
    }
}
