package tamer;

import java.time.Duration;
import log.effect.LogWriter;
import log.effect.LogWriter$;
import log.effect.LogWriterOps$;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOnce;
import scala.collection.StrictOptimizedIterableOps;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.math.Ordering;
import scala.math.Ordering$;
import scala.math.Ordering$Int$;
import scala.math.Ordering$String$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import tamer.Tamer;
import zio.CanFail$;
import zio.Chunk;
import zio.Chunk$;
import zio.Dequeue;
import zio.DurationSyntax$;
import zio.Enqueue;
import zio.NonEmptyChunk;
import zio.Promise$;
import zio.Schedule;
import zio.Schedule$;
import zio.ZIO;
import zio.ZIO$;
import zio.ZIO$ScopedPartiallyApplied$;
import zio.ZLayer;
import zio.Zippable$;
import zio.kafka.consumer.Consumer;
import zio.kafka.consumer.Offset;
import zio.kafka.consumer.Subscription;
import zio.kafka.consumer.Subscription$;
import zio.kafka.producer.Transaction;
import zio.kafka.producer.TransactionalProducer;
import zio.kafka.serde.Serializer;
import zio.package;
import zio.stream.ZStream;
import zio.stream.ZStream$;

/* compiled from: Tamer.scala */
/* loaded from: input_file:tamer/Tamer$.class */
public final class Tamer$ {
    public static final Tamer$ MODULE$ = new Tamer$();
    private static final Schedule<Object, Object, Tuple2<Object, Duration>> retries = Schedule$.MODULE$.recurs(10, "tamer.Tamer.retries(Tamer.scala:58)").$amp$amp(Schedule$.MODULE$.exponential(DurationSyntax$.MODULE$.milliseconds$extension(zio.package$.MODULE$.durationInt(100)), Schedule$.MODULE$.exponential$default$2(), "tamer.Tamer.retries(Tamer.scala:58)"), Zippable$.MODULE$.Zippable2());

    /* JADX INFO: Access modifiers changed from: private */
    public final Schedule<Object, Object, Tuple2<Object, Duration>> retries() {
        return retries;
    }

    private final Offset OffsetOps(Offset offset) {
        return offset;
    }

    public final <K, V> ZStream<Object, Throwable, BoxedUnit> sinkStream(String str, Serializer<Object, K> serializer, Serializer<Object, V> serializer2, Dequeue<Tuple2<Tamer.TxInfo, Chunk<Record<K, V>>>> dequeue, LogWriter<ZIO> logWriter) {
        return ZStream$.MODULE$.fromQueueWithShutdown(() -> {
            return dequeue;
        }, () -> {
            return ZStream$.MODULE$.fromQueueWithShutdown$default$2();
        }, "tamer.Tamer.sinkStream(Tamer.scala:72)").mapZIO(tuple2 -> {
            if (tuple2 != null) {
                Tamer.TxInfo txInfo = (Tamer.TxInfo) tuple2._1();
                Chunk chunk = (Chunk) tuple2._2();
                if (txInfo instanceof Tamer.TxInfo.Context) {
                    Transaction transaction = ((Tamer.TxInfo.Context) txInfo).transaction();
                    if (chunk.nonEmpty()) {
                        return ((ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                            return "pushing " + chunk.size() + " messages to " + str;
                        })).$times$greater(() -> {
                            return transaction.produceChunk(chunk.map(record -> {
                                return record.toKafkaProducerRecord(str);
                            }), serializer, serializer2, None$.MODULE$).tapError(th -> {
                                return (ZIO) LogWriterOps$.MODULE$.info$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                                    return "failed pushing " + chunk.size() + " messages to " + str + ", will retry. Caused by: " + th.getMessage();
                                });
                            }, CanFail$.MODULE$.canFail(), "tamer.Tamer.sinkStream(Tamer.scala:78)").retry(() -> {
                                return MODULE$.retries();
                            }, CanFail$.MODULE$.canFail(), "tamer.Tamer.sinkStream(Tamer.scala:79)").tapError(th2 -> {
                                return (ZIO) LogWriterOps$.MODULE$.warn$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                                    return "finally failed pushing " + chunk.size() + " messages to " + str + ", will abort. Caused by: " + th2.getMessage();
                                }, () -> {
                                    return th2;
                                });
                            }, CanFail$.MODULE$.canFail(), "tamer.Tamer.sinkStream(Tamer.scala:80)").unit("tamer.Tamer.sinkStream(Tamer.scala:81)");
                        }, "tamer.Tamer.sinkStream(Tamer.scala:75)").$times$greater(() -> {
                            return (ZIO) LogWriterOps$.MODULE$.info$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                                return "successfully pushed " + chunk.size() + " messages to " + str;
                            });
                        }, "tamer.Tamer.sinkStream(Tamer.scala:81)");
                    }
                }
            }
            if (tuple2 != null) {
                Tamer.TxInfo txInfo2 = (Tamer.TxInfo) tuple2._1();
                if (txInfo2 instanceof Tamer.TxInfo.Delimiter) {
                    return ((Tamer.TxInfo.Delimiter) txInfo2).promise().succeed(BoxedUnit.UNIT, "tamer.Tamer.sinkStream(Tamer.scala:84)").unit("tamer.Tamer.sinkStream(Tamer.scala:84)").$less$times(() -> {
                        return (ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                            return "user implicitly signalled end of data production";
                        });
                    }, "tamer.Tamer.sinkStream(Tamer.scala:84)");
                }
            }
            return (ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                return "received an empty chunk for " + str;
            });
        }, "tamer.Tamer.sinkStream(Tamer.scala:73)");
    }

    public final <K, V, SV> ZStream<Object, Throwable, BoxedUnit> sourceStream(String str, String str2, int i, zio.kafka.serde.Serde<Object, Tamer.StateKey> serde, zio.kafka.serde.Serde<Object, SV> serde2, SV sv, Consumer consumer, TransactionalProducer transactionalProducer, Enqueue<Tuple2<Tamer.TxInfo, Chunk<Record<K, V>>>> enqueue, Function2<SV, Enqueue<NonEmptyChunk<Record<K, V>>>, ZIO<Object, Throwable, SV>> function2, LogWriter<ZIO> logWriter) {
        Tamer.StateKey stateKey = new Tamer.StateKey(RichInt$.MODULE$.toHexString$extension(Predef$.MODULE$.intWrapper(i)), str2);
        Subscription subscription = Subscription$.MODULE$.topics(str, Nil$.MODULE$);
        ZIO map = consumer.partitionsFor(str, consumer.partitionsFor$default$2()).map(list -> {
            return list.map(partitionInfo -> {
                return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
            }).toSet();
        }, "tamer.Tamer.sourceStream.partitionSet(Tamer.scala:106)");
        ZIO flatMap = ((ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
            return "obtaining information on topic " + str;
        })).flatMap(boxedUnit -> {
            return map.flatMap(set -> {
                return ((ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                    return "received the following information on topic " + str + ": " + map;
                })).flatMap(boxedUnit -> {
                    return consumer.committed(set, consumer.committed$default$2()).flatMap(map2 -> {
                        return ((ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                            return "received the following commited state information on topic " + str + " for the group " + str2 + ": " + map2;
                        })).flatMap(boxedUnit -> {
                            return consumer.endOffsets(set, consumer.endOffsets$default$2()).flatMap(map2 -> {
                                return ((ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                                    return "received the following end offsets information on the topic " + str + ": " + map2;
                                })).flatMap(boxedUnit -> {
                                    return decide$1(map2, map2).flatMap(startupDecision -> {
                                        return ((ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                                            return "decided to " + startupDecision;
                                        })).map(boxedUnit -> {
                                            return startupDecision;
                                        }, "tamer.Tamer.sourceStream.startupDecision(Tamer.scala:133)");
                                    }, "tamer.Tamer.sourceStream.startupDecision(Tamer.scala:132)");
                                }, "tamer.Tamer.sourceStream.startupDecision(Tamer.scala:131)");
                            }, "tamer.Tamer.sourceStream.startupDecision(Tamer.scala:130)");
                        }, "tamer.Tamer.sourceStream.startupDecision(Tamer.scala:129)");
                    }, "tamer.Tamer.sourceStream.startupDecision(Tamer.scala:128)");
                }, "tamer.Tamer.sourceStream.startupDecision(Tamer.scala:127)");
            }, "tamer.Tamer.sourceStream.startupDecision(Tamer.scala:126)");
        }, "tamer.Tamer.sourceStream.startupDecision(Tamer.scala:125)").flatMap(startupDecision -> {
            if (Tamer$StartupDecision$Initialize$.MODULE$.equals(startupDecision)) {
                return ((ZIO) LogWriterOps$.MODULE$.info$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                    return "consumer group " + str2 + " never consumed from topic " + str;
                })).$times$greater(() -> {
                    return ZIO$ScopedPartiallyApplied$.MODULE$.apply$extension(ZIO$.MODULE$.scoped(), () -> {
                        return transactionalProducer.createTransaction().flatMap(transaction -> {
                            return transaction.produce(str, stateKey, sv, serde, serde2, None$.MODULE$).tap(recordMetadata -> {
                                return (ZIO) LogWriterOps$.MODULE$.info$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                                    return "pushed initial state " + sv + " to " + recordMetadata;
                                });
                            }, "tamer.Tamer.sourceStream.initialize(Tamer.scala:143)").unit("tamer.Tamer.sourceStream.initialize(Tamer.scala:144)");
                        }, "tamer.Tamer.sourceStream.initialize(Tamer.scala:140)");
                    }, "tamer.Tamer.sourceStream.initialize(Tamer.scala:139)");
                }, "tamer.Tamer.sourceStream.initialize(Tamer.scala:138)");
            }
            if (Tamer$StartupDecision$Resume$.MODULE$.equals(startupDecision)) {
                return (ZIO) LogWriterOps$.MODULE$.info$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                    return "consumer group " + str2 + " resuming consumption from topic " + str;
                });
            }
            throw new MatchError(startupDecision);
        }, "tamer.Tamer.sourceStream.initialize(Tamer.scala:136)");
        ZStream mapZIO = consumer.plainStream(subscription, serde, serde2, consumer.plainStream$default$4()).mapZIO(committableRecord -> {
            Object key = committableRecord.record().key();
            if (key != null ? key.equals(stateKey) : stateKey == null) {
                return ZIO$ScopedPartiallyApplied$.MODULE$.apply$extension(ZIO$.MODULE$.scoped(), () -> {
                    Offset offset = committableRecord.offset();
                    return transactionalProducer.createTransaction().flatMap(transaction -> {
                        EnrichedBoundedEnqueue enrichedBoundedEnqueue = new EnrichedBoundedEnqueue(enqueue, nonEmptyChunk -> {
                            return new Tuple2(new Tamer.TxInfo.Context(transaction), nonEmptyChunk.toChunk());
                        });
                        return ((ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                            return "consumer group " + str2 + " consumed state " + committableRecord.record().value() + " from " + Tamer$OffsetOps$.MODULE$.info$extension(MODULE$.OffsetOps(offset));
                        })).$times$greater(() -> {
                            return (ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                                return "invoking the iteration function under " + str2;
                            });
                        }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:159)").$times$greater(() -> {
                            return ((ZIO) function2.apply(committableRecord.record().value(), enrichedBoundedEnqueue)).flatMap(obj -> {
                                return Promise$.MODULE$.make("tamer.Tamer.sourceStream.stateStream(Tamer.scala:166)").flatMap(promise -> {
                                    return enqueue.offer(new Tuple2(new Tamer.TxInfo.Delimiter(promise), Chunk$.MODULE$.empty()), "tamer.Tamer.sourceStream.stateStream(Tamer.scala:169)").$times$greater(() -> {
                                        return promise.await("tamer.Tamer.sourceStream.stateStream(Tamer.scala:172)");
                                    }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:169)");
                                }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:166)").$times$greater(() -> {
                                    return (ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                                        return "consumer group " + str2 + " will commit offset " + Tamer$OffsetOps$.MODULE$.info$extension(MODULE$.OffsetOps(offset));
                                    });
                                }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:173)").$less$times(() -> {
                                    return transaction.produce(str, stateKey, obj, serde, serde2, new Some(offset)).tap(recordMetadata -> {
                                        return (ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                                            return "pushed state " + obj + " to " + recordMetadata + " for " + str2;
                                        });
                                    }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:177)");
                                }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:174)");
                            }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:161)");
                        }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:161)");
                    }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:156)");
                }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:154)");
            }
            Offset offset = committableRecord.offset();
            return ((ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                return "consumer group " + str2 + " ignored state (wrong key: " + committableRecord.record().key() + " != " + stateKey + ") from " + Tamer$OffsetOps$.MODULE$.info$extension(MODULE$.OffsetOps(offset));
            })).$times$greater(() -> {
                return offset.commitOrRetry(MODULE$.retries());
            }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:183)").$less$times(() -> {
                return (ZIO) LogWriterOps$.MODULE$.debug$extension(LogWriter$.MODULE$.loggerSyntax(logWriter), () -> {
                    return "consumer group " + str2 + " committed offset " + Tamer$OffsetOps$.MODULE$.info$extension(MODULE$.OffsetOps(offset));
                });
            }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:184)");
        }, "tamer.Tamer.sourceStream.stateStream(Tamer.scala:152)");
        return ZStream$.MODULE$.fromZIO(() -> {
            return flatMap;
        }, "tamer.Tamer.sourceStream(Tamer.scala:188)").drain("tamer.Tamer.sourceStream(Tamer.scala:188)").$plus$plus(() -> {
            return mapZIO;
        }, "tamer.Tamer.sourceStream(Tamer.scala:188)");
    }

    public final <R, K, V, SV> ZLayer<KafkaConfig, Throwable, Tamer> live(Setup<R, K, V, SV> setup, package.Tag<K> tag, package.Tag<V> tag2, package.Tag<SV> tag3) {
        return Tamer$LiveTamer$.MODULE$.getLayer(setup, tag, tag2, tag3);
    }

    public static final /* synthetic */ boolean $anonfun$sourceStream$6(Tuple2 tuple2, Tuple2 tuple22) {
        Tuple2 tuple23 = new Tuple2(tuple2, tuple22);
        if (tuple23 == null) {
            return false;
        }
        Tuple2 tuple24 = (Tuple2) tuple23._1();
        Tuple2 tuple25 = (Tuple2) tuple23._2();
        if (tuple24 == null) {
            return false;
        }
        TopicPartition topicPartition = (TopicPartition) tuple24._1();
        long _2$mcJ$sp = tuple24._2$mcJ$sp();
        if (tuple25 == null) {
            return false;
        }
        TopicPartition topicPartition2 = (TopicPartition) tuple25._1();
        Some some = (Option) tuple25._2();
        if (!(some instanceof Some)) {
            return false;
        }
        OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata) some.value();
        if (topicPartition != null ? topicPartition.equals(topicPartition2) : topicPartition2 == null) {
            if (_2$mcJ$sp > offsetAndMetadata.offset()) {
                return true;
            }
        }
        return false;
    }

    private static final ZIO decide$1(Map map, Map map2) {
        Ordering by = scala.package$.MODULE$.Ordering().by(topicPartition -> {
            return new Tuple2(topicPartition.topic(), BoxesRunTime.boxToInteger(topicPartition.partition()));
        }, Ordering$.MODULE$.Tuple2(Ordering$String$.MODULE$, Ordering$Int$.MODULE$));
        Ordering on = by.on(tuple2 -> {
            return (TopicPartition) tuple2._1();
        });
        Ordering on2 = by.on(tuple22 -> {
            return (TopicPartition) tuple22._1();
        });
        Function2 function2 = (tuple23, tuple24) -> {
            return BoxesRunTime.boxToBoolean($anonfun$sourceStream$6(tuple23, tuple24));
        };
        Set keySet = map2.keySet();
        Set keySet2 = map.keySet();
        if (keySet != null ? keySet.equals(keySet2) : keySet2 == null) {
            if (map.values().forall(option -> {
                return BoxesRunTime.boxToBoolean(option.isEmpty());
            })) {
                return ZIO$.MODULE$.succeed(() -> {
                    return Tamer$StartupDecision$Initialize$.MODULE$;
                }, "tamer.Tamer.sourceStream.decide(Tamer.scala:119)");
            }
        }
        return ((List) ((StrictOptimizedIterableOps) map2.toList().sorted(on)).zip((IterableOnce) map.toList().sorted(on2))).forall(function2.tupled()) ? ZIO$.MODULE$.succeed(() -> {
            return Tamer$StartupDecision$Resume$.MODULE$;
        }, "tamer.Tamer.sourceStream.decide(Tamer.scala:120)") : ZIO$.MODULE$.fail(() -> {
            return TamerError$.MODULE$.apply("Tamer is stuck, it will not proceed unless state is restored manually");
        }, "tamer.Tamer.sourceStream.decide(Tamer.scala:121)");
    }

    private Tamer$() {
    }
}
