package fs2.aws.kinesis;

import cats.effect.kernel.Async;
import cats.effect.kernel.GenConcurrent;
import cats.effect.std.Dispatcher;
import cats.effect.std.Dispatcher$;
import cats.effect.std.Queue;
import cats.effect.std.Queue$;
import cats.implicits$;
import eu.timepit.refined.api.RefType$;
import eu.timepit.refined.auto$;
import fs2.Chunk;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$NestedStreamOps$;
import fs2.aws.core.package$;
import fs2.concurrent.SignallingRef;
import fs2.concurrent.SignallingRef$;
import scala.DummyImplicit$;
import scala.Function1;
import scala.MatchError;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.util.NotGiven$;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

/* compiled from: Kinesis.scala */
/* loaded from: input_file:fs2/aws/kinesis/Kinesis.class */
public interface Kinesis<F> {

    /* compiled from: Kinesis.scala */
    /* loaded from: input_file:fs2/aws/kinesis/Kinesis$GenericKinesis.class */
    public static abstract class GenericKinesis<F> implements Kinesis<F> {
        private final Async<F> evidence$1;

        public GenericKinesis(Async<F> async, GenConcurrent<F, Throwable> genConcurrent) {
            this.evidence$1 = async;
        }

        @Override // fs2.aws.kinesis.Kinesis
        public /* bridge */ /* synthetic */ Stream readFromKinesisStream(String str, String str2) {
            return readFromKinesisStream(str, str2);
        }

        @Override // fs2.aws.kinesis.Kinesis
        public /* bridge */ /* synthetic */ Stream readFromKinesisStream(KinesisConsumerSettings kinesisConsumerSettings) {
            return readFromKinesisStream(kinesisConsumerSettings);
        }

        public Stream<F, Chunk<CommittableRecord>> readChunksFromKinesisStream(KinesisConsumerSettings kinesisConsumerSettings, Function1<ShardRecordProcessorFactory, F> function1) {
            return Stream$.MODULE$.resource(Dispatcher$.MODULE$.apply(this.evidence$1), this.evidence$1).flatMap(dispatcher -> {
                return Stream$.MODULE$.eval(Queue$.MODULE$.bounded(BoxesRunTime.unboxToInt(auto$.MODULE$.autoUnwrap(BoxesRunTime.boxToInteger(kinesisConsumerSettings.bufferSize()), RefType$.MODULE$.refinedRefType())), this.evidence$1)).flatMap(queue -> {
                    return Stream$.MODULE$.eval(SignallingRef$.MODULE$.apply(BoxesRunTime.boxToBoolean(false), this.evidence$1)).flatMap(signallingRef -> {
                        return instantiateScheduler$1(function1, dispatcher, queue, signallingRef).flatMap(scheduler -> {
                            return Stream$.MODULE$.fromQueueUnterminated(queue, Stream$.MODULE$.fromQueueUnterminated$default$2(), this.evidence$1).interruptWhen(signallingRef, this.evidence$1).map(chunk -> {
                                return chunk;
                            });
                        }, NotGiven$.MODULE$.value());
                    }, NotGiven$.MODULE$.value());
                }, NotGiven$.MODULE$.value());
            }, NotGiven$.MODULE$.value());
        }

        @Override // fs2.aws.kinesis.Kinesis
        public Function1<Stream<F, CommittableRecord>, Stream<F, KinesisClientRecord>> checkpointRecords(KinesisCheckpointSettings kinesisCheckpointSettings) {
            return stream -> {
                return Stream$NestedStreamOps$.MODULE$.parJoinUnbounded$extension(Stream$.MODULE$.NestedStreamOps(stream.through(package$.MODULE$.groupBy(committableRecord -> {
                    return cats.effect.package$.MODULE$.Sync().apply(this.evidence$1).pure(committableRecord.shardId());
                }, this.evidence$1)).map(tuple2 -> {
                    if (tuple2 != null) {
                        return ((Stream) tuple2._2()).broadcastThrough(ScalaRunTime$.MODULE$.wrapRefArray(new Function1[]{checkpoint$1(kinesisCheckpointSettings), bypass$1()}), this.evidence$1);
                    }
                    throw new MatchError(tuple2);
                })), this.evidence$1);
            };
        }

        private final void instantiateScheduler$1$$anonfun$2$$anonfun$1(Scheduler scheduler) {
            scheduler.run();
        }

        private final void instantiateScheduler$1$$anonfun$3$$anonfun$1(Scheduler scheduler) {
            scheduler.shutdown();
        }

        private final Stream instantiateScheduler$1(Function1 function1, Dispatcher dispatcher, Queue queue, SignallingRef signallingRef) {
            return Stream$.MODULE$.bracket(implicits$.MODULE$.toFlatMapOps(function1.apply(() -> {
                return new ChunkedRecordProcessor(chunk -> {
                    dispatcher.unsafeRunSync(queue.offer(chunk));
                });
            }), this.evidence$1).flatTap(scheduler -> {
                return cats.effect.package$.MODULE$.Concurrent().apply(this.evidence$1, DummyImplicit$.MODULE$.dummyImplicit()).start(implicits$.MODULE$.toFlatMapOps(cats.effect.package$.MODULE$.Async().apply(this.evidence$1).blocking(() -> {
                    r3.instantiateScheduler$1$$anonfun$2$$anonfun$1(r4);
                }), this.evidence$1).flatTap(boxedUnit -> {
                    return signallingRef.set(BoxesRunTime.boxToBoolean(true));
                }));
            }), scheduler2 -> {
                return cats.effect.package$.MODULE$.Async().apply(this.evidence$1).blocking(() -> {
                    r1.instantiateScheduler$1$$anonfun$3$$anonfun$1(r2);
                });
            });
        }

        private final Function1 checkpoint$1(KinesisCheckpointSettings kinesisCheckpointSettings) {
            return stream -> {
                return stream.groupWithin(kinesisCheckpointSettings.maxBatchSize(), kinesisCheckpointSettings.maxBatchWait(), this.evidence$1).collect(new Kinesis$$anon$1()).flatMap(committableRecord -> {
                    return Stream$.MODULE$.eval(implicits$.MODULE$.toFunctorOps(committableRecord.checkpoint(this.evidence$1), this.evidence$1).as(committableRecord.record())).drain();
                }, NotGiven$.MODULE$.value());
            };
        }

        private final Function1 bypass$1() {
            return stream -> {
                return stream.map(committableRecord -> {
                    return committableRecord.record();
                });
            };
        }
    }

    static <F> Kinesis<F> create(Function1<ShardRecordProcessorFactory, Object> function1, Async<F> async, GenConcurrent<F, Throwable> genConcurrent) {
        return Kinesis$.MODULE$.create(function1, async, genConcurrent);
    }

    static <F> Kinesis<F> create(KinesisAsyncClient kinesisAsyncClient, DynamoDbAsyncClient dynamoDbAsyncClient, CloudWatchAsyncClient cloudWatchAsyncClient, Async<F> async, GenConcurrent<F, Throwable> genConcurrent) {
        return Kinesis$.MODULE$.create(kinesisAsyncClient, dynamoDbAsyncClient, cloudWatchAsyncClient, async, genConcurrent);
    }

    default Stream<F, CommittableRecord> readFromKinesisStream(String str, String str2) {
        return readFromKinesisStream(KinesisConsumerSettings$.MODULE$.apply(str2, str, KinesisConsumerSettings$.MODULE$.apply$default$3(), KinesisConsumerSettings$.MODULE$.apply$default$4(), KinesisConsumerSettings$.MODULE$.apply$default$5(), KinesisConsumerSettings$.MODULE$.apply$default$6()));
    }

    default Stream<F, CommittableRecord> readFromKinesisStream(KinesisConsumerSettings kinesisConsumerSettings) {
        return readChunkedFromKinesisStream(kinesisConsumerSettings).flatMap(chunk -> {
            return Stream$.MODULE$.chunk(chunk);
        }, NotGiven$.MODULE$.value());
    }

    Stream<F, Chunk<CommittableRecord>> readChunkedFromKinesisStream(KinesisConsumerSettings kinesisConsumerSettings);

    Function1<Stream<F, CommittableRecord>, Stream<F, KinesisClientRecord>> checkpointRecords(KinesisCheckpointSettings kinesisCheckpointSettings);
}
