package dev.chopsticks.dstream;

import akka.NotUsed;
import akka.NotUsed$;
import akka.grpc.scaladsl.StreamResponseRequestBuilder;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import dev.chopsticks.dstream.DstreamClient;
import dev.chopsticks.dstream.DstreamWorker;
import dev.chopsticks.dstream.metric.DstreamWorkerMetrics;
import dev.chopsticks.fp.ZRunnable$;
import dev.chopsticks.fp.iz_logging.IzLogging;
import dev.chopsticks.fp.iz_logging.IzLogging$;
import dev.chopsticks.fp.iz_logging.LogCtx$;
import dev.chopsticks.fp.util.SharedResourceManager;
import dev.chopsticks.stream.ZAkkaSource;
import dev.chopsticks.stream.ZAkkaSource$;
import eu.timepit.refined.api.RefType$;
import eu.timepit.refined.api.Refined;
import eu.timepit.refined.auto$;
import izumi.fundamentals.platform.language.CodePosition;
import izumi.fundamentals.platform.language.SourceFilePosition;
import izumi.logstage.api.IzLogger;
import izumi.logstage.api.Log;
import izumi.logstage.api.Log$Entry$;
import izumi.logstage.api.Log$Level$Debug$;
import izumi.logstage.api.Log$Level$Error$;
import izumi.logstage.api.Log$LogArg$;
import izumi.logstage.api.rendering.LogstageCodec$;
import izumi.reflect.HKTag$;
import izumi.reflect.Tag;
import izumi.reflect.Tag$;
import izumi.reflect.macrortti.LightTypeTag$;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.concurrent.TimeoutException;
import scala.$less$colon$less$;
import scala.Function1;
import scala.Function2;
import scala.Function3;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.PartialFunction;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.UninitializedFieldError;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.concurrent.Promise$;
import scala.jdk.DurationConverters$;
import scala.jdk.DurationConverters$ScalaDurationOps$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;
import sourcecode.FileName;
import sourcecode.Line;
import zio.CanFail$;
import zio.Exit;
import zio.Has;
import zio.Has$;
import zio.Has$HasSyntax$;
import zio.NeedsEnv$;
import zio.Schedule;
import zio.Schedule$;
import zio.Task$;
import zio.UIO$;
import zio.ZIO;
import zio.ZIO$;
import zio.ZIO$AccessMPartiallyApplied$;
import zio.ZIO$BracketExitAcquire$;
import zio.ZLayer;
import zio.ZManaged$;
import zio.ZManaged$AccessManagedPartiallyApplied$;

/* compiled from: DstreamWorker.scala */
/* loaded from: input_file:dev/chopsticks/dstream/DstreamWorker$.class */
public final class DstreamWorker$ {
    public static final DstreamWorker$ MODULE$ = new DstreamWorker$();
    private static final PartialFunction<Throwable, Object> defaultRetryPolicy = new DstreamWorker$$anonfun$1();
    private static volatile byte bitmap$init$0 = (byte) (bitmap$init$0 | 4);
    private static volatile byte bitmap$init$0 = (byte) (bitmap$init$0 | 4);

    public PartialFunction<Throwable, Object> defaultRetryPolicy() {
        if (((byte) (bitmap$init$0 & 4)) == 0) {
            throw new UninitializedFieldError("Uninitialized field: /runner/_work/chopsticks/chopsticks/chopsticks-dstream/src/main/scala/dev/chopsticks/dstream/DstreamWorker.scala: 50");
        }
        PartialFunction<Throwable, Object> partialFunction = defaultRetryPolicy;
        return defaultRetryPolicy;
    }

    public Schedule<Has<IzLogging.Service>, Throwable, BoxedUnit> createRetrySchedule(int i, DstreamWorker.DstreamWorkerRetryConfig dstreamWorkerRetryConfig, PartialFunction<Throwable, Object> partialFunction) {
        return Schedule$.MODULE$.identity().whileOutput(th -> {
            return BoxesRunTime.boxToBoolean($anonfun$createRetrySchedule$1(partialFunction, th));
        }).$amp$amp(Schedule$.MODULE$.exponential(DurationConverters$ScalaDurationOps$.MODULE$.toJava$extension(DurationConverters$.MODULE$.ScalaDurationOps(dstreamWorkerRetryConfig.retryInitialDelay())), Schedule$.MODULE$.exponential$default$2()).resetAfter(DurationConverters$ScalaDurationOps$.MODULE$.toJava$extension(DurationConverters$.MODULE$.ScalaDurationOps(dstreamWorkerRetryConfig.retryResetAfter()))).$bar$bar(Schedule$.MODULE$.spaced(DurationConverters$ScalaDurationOps$.MODULE$.toJava$extension(DurationConverters$.MODULE$.ScalaDurationOps(dstreamWorkerRetryConfig.retryMaxDelay()))))).onDecision(decision -> {
            ZIO map;
            Tuple2 tuple2;
            if (!(decision instanceof Schedule.Decision.Done) || (tuple2 = (Tuple2) ((Schedule.Decision.Done) decision).out()) == null) {
                if (decision instanceof Schedule.Decision.Continue) {
                    Schedule.Decision.Continue r0 = (Schedule.Decision.Continue) decision;
                    Tuple2 tuple22 = (Tuple2) r0.out();
                    OffsetDateTime interval = r0.interval();
                    if (tuple22 != null) {
                        Throwable th2 = (Throwable) tuple22._1();
                        map = IzLogging$.MODULE$.logger().map(izLogger -> {
                            $anonfun$createRetrySchedule$5(i, interval, th2, izLogger);
                            return BoxedUnit.UNIT;
                        });
                    }
                }
                throw new MatchError(decision);
            }
            Throwable th3 = (Throwable) tuple2._1();
            map = IzLogging$.MODULE$.logger().map(izLogger2 -> {
                $anonfun$createRetrySchedule$4(i, th3, izLogger2);
                return BoxedUnit.UNIT;
            });
            return map;
        }).unit();
    }

    public PartialFunction<Throwable, Object> createRetrySchedule$default$3() {
        return defaultRetryPolicy();
    }

    public <Assignment, Result> ZIO<Has<SharedResourceManager.Service<Object, String, DstreamWorkerMetrics>>, Throwable, BoxedUnit> runWorkers(DstreamWorker.DstreamWorkerConfig dstreamWorkerConfig, Function2<Object, Assignment, ZIO<Object, Throwable, Source<Result, NotUsed>>> function2, Function1<Object, Schedule<Object, Throwable, Object>> function1, Tag<Assignment> tag, Tag<Result> tag2) {
        return ZIO$.MODULE$.foreachPar_(RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), BoxesRunTime.unboxToInt(auto$.MODULE$.autoUnwrap(new Refined(dstreamWorkerConfig.parallelism()), RefType$.MODULE$.refinedRefType()))), obj -> {
            return $anonfun$runWorkers$1(tag, tag2, dstreamWorkerConfig, function2, function1, BoxesRunTime.unboxToInt(obj));
        });
    }

    public <Assignment, Result> ZLayer<Has<IzLogging.Service>, Nothing$, Has<DstreamWorker.Service<Assignment, Result>>> live(Tag<Assignment> tag, Tag<Result> tag2) {
        return ZRunnable$.MODULE$.apply((dstreamWorkerConfig, function2, function1) -> {
            return MODULE$.runWorkers(dstreamWorkerConfig, function2, function1, tag, tag2);
        }).toLayer(function3 -> {
            return new DstreamWorker.Service<Assignment, Result>(function3) { // from class: dev.chopsticks.dstream.DstreamWorker$$anon$1
                private final Function3 fn$1;

                @Override // dev.chopsticks.dstream.DstreamWorker.Service
                public <R1, R2> ZIO<R1, Throwable, BoxedUnit> run(DstreamWorker.DstreamWorkerConfig dstreamWorkerConfig2, Function2<Object, Assignment, ZIO<R1, Throwable, Source<Result, NotUsed>>> function22, Function1<Object, Schedule<R2, Throwable, Object>> function12) {
                    return ZIO$.MODULE$.environment().flatMap(obj -> {
                        return ((ZIO) this.fn$1.apply(dstreamWorkerConfig2, (obj, obj2) -> {
                            return $anonfun$run$2(function22, obj, BoxesRunTime.unboxToInt(obj), obj2);
                        }, obj3 -> {
                            return $anonfun$run$3(function12, obj, BoxesRunTime.unboxToInt(obj3));
                        })).map(boxedUnit -> {
                            $anonfun$run$4(boxedUnit);
                            return BoxedUnit.UNIT;
                        });
                    });
                }

                public static final /* synthetic */ ZIO $anonfun$run$2(Function2 function22, Object obj, int i, Object obj2) {
                    return ((ZIO) function22.apply(BoxesRunTime.boxToInteger(i), obj2)).provide(obj, NeedsEnv$.MODULE$.needsEnv());
                }

                public static final /* synthetic */ Schedule $anonfun$run$3(Function1 function12, Object obj, int i) {
                    return ((Schedule) function12.apply(BoxesRunTime.boxToInteger(i))).provide(obj);
                }

                public static final /* synthetic */ void $anonfun$run$4(BoxedUnit boxedUnit) {
                }

                {
                    this.fn$1 = function3;
                }
            };
        }, Tag$.MODULE$.appliedTag(HKTag$.MODULE$.apply(DstreamWorker.Service.class, LightTypeTag$.MODULE$.parse(-1138425775, "\u0003��\u0002��\u00010��\u00011\u0001��,dev.chopsticks.dstream.DstreamWorker.Service\u0002��\u0004��\u0001\u0090\u0002\u0001\u0001����\u0004��\u0001\u0090\u0003\u0001\u0001��\u0002\u0003����$dev.chopsticks.dstream.DstreamWorker\u0001\u0001", "������", 11)), new $colon.colon(tag.tag(), new $colon.colon(tag2.tag(), Nil$.MODULE$))));
    }

    public static final /* synthetic */ boolean $anonfun$createRetrySchedule$2(Object obj) {
        return false;
    }

    public static final /* synthetic */ boolean $anonfun$createRetrySchedule$1(PartialFunction partialFunction, Throwable th) {
        return BoxesRunTime.unboxToBoolean(partialFunction.applyOrElse(th, obj -> {
            return BoxesRunTime.boxToBoolean($anonfun$createRetrySchedule$2(obj));
        }));
    }

    public static final /* synthetic */ void $anonfun$createRetrySchedule$4(int i, Throwable th, IzLogger izLogger) {
        if (!izLogger.acceptable("dev.chopsticks.dstream.DstreamWorker.createRetrySchedule.70.72", Log$Level$Error$.MODULE$)) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            izLogger.unsafeLog(Log$Entry$.MODULE$.create(Log$Level$Error$.MODULE$, new Log.Message(new StringContext(scala.package$.MODULE$.Seq().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"", " will NOT retry ", ""}))), new $colon.colon(Log$LogArg$.MODULE$.apply((Seq) scala.package$.MODULE$.List().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"workerId"})), BoxesRunTime.boxToInteger(i), false, new Some(LogstageCodec$.MODULE$.LogstageCodecInt())), new $colon.colon(Log$LogArg$.MODULE$.apply((Seq) scala.package$.MODULE$.List().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"exception"})), th, false, new Some(LogstageCodec$.MODULE$.LogstageCodecThrowable())), Nil$.MODULE$))), new CodePosition(new SourceFilePosition("DstreamWorker.scala", 72), "dev.chopsticks.dstream.DstreamWorker.createRetrySchedule.70.72")));
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$createRetrySchedule$5(int i, OffsetDateTime offsetDateTime, Throwable th, IzLogger izLogger) {
        if (!izLogger.acceptable("dev.chopsticks.dstream.DstreamWorker.createRetrySchedule.70.74", Log$Level$Debug$.MODULE$)) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            izLogger.unsafeLog(Log$Entry$.MODULE$.create(Log$Level$Debug$.MODULE$, new Log.Message(new StringContext(scala.package$.MODULE$.Seq().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"", " will retry ", " ", ""}))), new $colon.colon(Log$LogArg$.MODULE$.apply((Seq) scala.package$.MODULE$.List().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"workerId"})), BoxesRunTime.boxToInteger(i), false, new Some(LogstageCodec$.MODULE$.LogstageCodecInt())), new $colon.colon(Log$LogArg$.MODULE$.apply((Seq) scala.package$.MODULE$.List().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"duration"})), Duration.between(OffsetDateTime.now(), offsetDateTime), false, None$.MODULE$), new $colon.colon(Log$LogArg$.MODULE$.apply((Seq) scala.package$.MODULE$.List().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{"exception"})), th.getMessage(), false, new Some(LogstageCodec$.MODULE$.LogstageCodecString())), Nil$.MODULE$)))), new CodePosition(new SourceFilePosition("DstreamWorker.scala", 74), "dev.chopsticks.dstream.DstreamWorker.createRetrySchedule.70.74")));
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ boolean $anonfun$runWorkers$9(Throwable th) {
        return th instanceof TimeoutException;
    }

    public static final /* synthetic */ ZIO $anonfun$runWorkers$1(Tag tag, Tag tag2, DstreamWorker.DstreamWorkerConfig dstreamWorkerConfig, Function2 function2, Function1 function1, int i) {
        return ZManaged$AccessManagedPartiallyApplied$.MODULE$.apply$extension(ZManaged$.MODULE$.accessManaged(), has -> {
            return ((SharedResourceManager.Service) Has$HasSyntax$.MODULE$.get$extension(Has$.MODULE$.HasSyntax(has), $less$colon$less$.MODULE$.refl(), Tag$.MODULE$.apply(SharedResourceManager.Service.class, LightTypeTag$.MODULE$.parse(-322977949, "\u0001��4dev.chopsticks.fp.util.SharedResourceManager.Service\u0003��\u0004��\u0001\tscala.Any\u0001\u0001\u0001��\u0004��\u0001\u0010java.lang.String\u0001\u0001����\u0004��\u00012dev.chopsticks.dstream.metric.DstreamWorkerMetrics\u0001\u0001��\u0002\u0003����,dev.chopsticks.fp.util.SharedResourceManager\u0001\u0001", "��\u0001\u0004��\u0001\u0010java.lang.String\u0001\u0001\u0003\u0004��\u0001\u0016java.lang.CharSequence\u0001\u0001\u0004��\u0001\u0014java.io.Serializable\u0001\u0001\u0001��\u0014java.lang.Comparable\u0001��\u0004��\u0001\u0090\u0002\u0001\u0001��\u0001\u0001��\u0001\u0090\u0002\u0001\u0001\u0003��\u0001\u0090\u0003\u0001\u0001��\u0001\u0090\u0004\u0001\u0001��\u0001\u0090\u0005\u0001\u0001", 11)))).manage(Integer.toString(i));
        }).use(dstreamWorkerMetrics -> {
            return ZIO$AccessMPartiallyApplied$.MODULE$.apply$extension(ZIO$.MODULE$.accessM(), has2 -> {
                return ((DstreamClient.Service) Has$HasSyntax$.MODULE$.get$extension(Has$.MODULE$.HasSyntax(has2), $less$colon$less$.MODULE$.refl(), Tag$.MODULE$.appliedTag(HKTag$.MODULE$.apply(DstreamClient.Service.class, LightTypeTag$.MODULE$.parse(216507150, "\u0003��\u0002��\u00010��\u00011\u0001��,dev.chopsticks.dstream.DstreamClient.Service\u0002��\u0004��\u0001\u0090\u0002\u0001\u0001����\u0004��\u0001\u0090\u0003\u0001\u0001��\u0002\u0003����$dev.chopsticks.dstream.DstreamClient\u0001\u0001", "������", 11)), new $colon.colon(tag.tag(), new $colon.colon(tag2.tag(), Nil$.MODULE$))))).requestBuilder(dstreamWorkerConfig.clientSettings());
            }).map(function12 -> {
                return new Tuple2(function12, ZIO$BracketExitAcquire$.MODULE$.apply$extension(ZIO$.MODULE$.bracketExit(UIO$.MODULE$.apply(() -> {
                    dstreamWorkerMetrics.attemptsTotal().inc();
                    dstreamWorkerMetrics.workerStatus().set(1);
                })), (boxedUnit, exit) -> {
                    return UIO$.MODULE$.apply(() -> {
                        BoxedUnit boxedUnit;
                        BoxedUnit boxedUnit2;
                        BoxedUnit boxedUnit3;
                        dstreamWorkerMetrics.workerStatus().set(0);
                        if (exit instanceof Exit.Success) {
                            if (((Option) ((Exit.Success) exit).value()).nonEmpty()) {
                                dstreamWorkerMetrics.successesTotal().inc();
                                boxedUnit3 = BoxedUnit.UNIT;
                            } else {
                                dstreamWorkerMetrics.timeoutsTotal().inc();
                                boxedUnit3 = BoxedUnit.UNIT;
                            }
                            boxedUnit2 = boxedUnit3;
                        } else {
                            if (!(exit instanceof Exit.Failure)) {
                                throw new MatchError(exit);
                            }
                            if (((Exit.Failure) exit).cause().failures().exists(th -> {
                                return BoxesRunTime.boxToBoolean($anonfun$runWorkers$9(th));
                            })) {
                                dstreamWorkerMetrics.timeoutsTotal().inc();
                                boxedUnit = BoxedUnit.UNIT;
                            } else {
                                dstreamWorkerMetrics.failuresTotal().inc();
                                boxedUnit = BoxedUnit.UNIT;
                            }
                            boxedUnit2 = boxedUnit;
                        }
                        return boxedUnit2;
                    });
                }).apply(boxedUnit2 -> {
                    return UIO$.MODULE$.apply(() -> {
                        return Promise$.MODULE$.apply();
                    }).flatMap(promise -> {
                        ZAkkaSource.InterruptibleZAkkaSourceOps InterruptibleZAkkaSourceOps = ZAkkaSource$.MODULE$.InterruptibleZAkkaSourceOps(() -> {
                            return ZAkkaSource$.MODULE$.SourceToZAkkaSource(() -> {
                                return ((StreamResponseRequestBuilder) function12.apply(BoxesRunTime.boxToInteger(i))).invoke(Source$.MODULE$.futureSource(promise.future()).mapMaterializedValue(future -> {
                                    return NotUsed$.MODULE$;
                                }));
                            }).toZAkkaSource().killSwitch().viaBuilder(flow -> {
                                return flow.initialTimeout(dstreamWorkerConfig.assignmentTimeout().duration());
                            }).mapAsync(1, obj -> {
                                return ((ZIO) function2.apply(BoxesRunTime.boxToInteger(i), obj)).tap(source -> {
                                    return UIO$.MODULE$.apply(() -> {
                                        return promise.success(source);
                                    });
                                }).zipRight(() -> {
                                    return Task$.MODULE$.fromFuture(executionContext -> {
                                        return promise.future();
                                    });
                                }).as(() -> {
                                    return obj;
                                });
                            });
                        });
                        return InterruptibleZAkkaSourceOps.interruptibleRunWith(() -> {
                            return Sink$.MODULE$.lastOption();
                        }, InterruptibleZAkkaSourceOps.interruptibleRunWith$default$2(), LogCtx$.MODULE$.autoLogContext(new Line(139), new FileName("DstreamWorker.scala"))).map(option -> {
                            return option;
                        });
                    });
                }));
            }).flatMap(tuple2 -> {
                if (tuple2 != null) {
                    return ((ZIO) tuple2._2()).forever().unit().retry((Schedule) function1.apply(BoxesRunTime.boxToInteger(i)), CanFail$.MODULE$.canFail()).map(boxedUnit -> {
                        return BoxedUnit.UNIT;
                    });
                }
                throw new MatchError(tuple2);
            });
        });
    }

    private DstreamWorker$() {
    }
}
