package io.laserdisc.mysql.binlog.stream;

import cats.effect.kernel.GenConcurrent;
import cats.effect.kernel.Ref;
import cats.effect.kernel.Sync;
import cats.implicits$;
import cats.syntax.FlatMapOps$;
import com.github.shyiko.mysql.binlog.event.Event;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$OptionStreamOps$;
import fs2.compat.NotGiven$;
import io.laserdisc.mysql.binlog.event.EventMessage;
import org.typelevel.log4cats.Logger;
import org.typelevel.log4cats.Logger$;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.collection.StringOps$;
import scala.collection.immutable.List;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: package.scala */
/* loaded from: input_file:io/laserdisc/mysql/binlog/stream/package$.class */
public final class package$ {
    public static final package$ MODULE$ = new package$();

    public <F> Function1<Stream<F, Event>, Stream<F, EventMessage>> streamEvents(Ref<F, TransactionState> ref, String str, GenConcurrent<F, Throwable> genConcurrent, Sync<F> sync, Logger<F> logger) {
        return stream -> {
            return stream.through(MODULE$.streamTransactionPackages(ref, str, genConcurrent, logger)).flatMap(transactionPackage -> {
                return Stream$.MODULE$.eval(MODULE$.warnBigTransactionPackage(transactionPackage, sync, logger)).$greater$greater(() -> {
                    return Stream$.MODULE$.apply(transactionPackage.events());
                }, NotGiven$.MODULE$.default());
            }, NotGiven$.MODULE$.default());
        };
    }

    public <F> Function1<Stream<F, Event>, Stream<F, EventMessage>> streamCompactedEvents(Ref<F, TransactionState> ref, String str, GenConcurrent<F, Throwable> genConcurrent, Logger<F> logger) {
        return stream -> {
            return stream.through(MODULE$.streamTransactionPackages(ref, str, genConcurrent, logger)).flatMap(transactionPackage -> {
                return Stream$.MODULE$.apply(io.laserdisc.mysql.binlog.compaction.package$.MODULE$.compact(transactionPackage.events()));
            }, NotGiven$.MODULE$.default());
        };
    }

    public <F> Function1<Stream<F, Event>, Stream<F, TransactionPackage>> streamTransactionPackages(Ref<F, TransactionState> ref, String str, GenConcurrent<F, Throwable> genConcurrent, Logger<F> logger) {
        return stream -> {
            return Stream$OptionStreamOps$.MODULE$.unNone$extension(Stream$.MODULE$.OptionStreamOps(stream.evalMap(event -> {
                return FlatMapOps$.MODULE$.$greater$greater$extension(implicits$.MODULE$.catsSyntaxFlatMapOps(Logger$.MODULE$.apply(logger).debug(() -> {
                    return new StringBuilder(22).append("received binlog event ").append(event).toString();
                }), genConcurrent), () -> {
                    return ref.modifyState(TransactionState$.MODULE$.nextState(event, str));
                }, genConcurrent);
            })));
        };
    }

    public <F> F warnBigTransactionPackage(TransactionPackage transactionPackage, Sync<F> sync, Logger<F> logger) {
        return transactionPackage.events().size() >= 1000 ? (F) implicits$.MODULE$.toFlatMapOps(cats.effect.package$.MODULE$.Sync().apply(sync).delay(() -> {
            return transactionPackage.events().groupBy(eventMessage -> {
                return eventMessage.table();
            }).map(tuple2 -> {
                if (tuple2 == null) {
                    throw new MatchError(tuple2);
                }
                return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((String) tuple2._1()), BoxesRunTime.boxToInteger(((List) tuple2._2()).size()));
            });
        }), sync).flatMap(map -> {
            return implicits$.MODULE$.toFunctorOps(Logger$.MODULE$.apply(logger).warn(() -> {
                return StringOps$.MODULE$.stripMargin$extension(Predef$.MODULE$.augmentString(new StringBuilder(114).append("Transaction has > then 1000 elements in it with\n                                 |following distribution ").append(map).append("\n        ").toString()));
            }), sync).map(boxedUnit -> {
                BoxedUnit.UNIT;
                return BoxedUnit.UNIT;
            });
        }) : (F) cats.effect.package$.MODULE$.Sync().apply(sync).unit();
    }

    private package$() {
    }
}
