package fs2.aws.dynamodb;

import cats.effect.kernel.Async;
import cats.effect.kernel.GenConcurrent;
import cats.effect.std.Dispatcher;
import cats.effect.std.Dispatcher$;
import cats.effect.std.Queue;
import cats.effect.std.Queue$;
import cats.syntax.package$flatMap$;
import cats.syntax.package$functor$;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.model.Record;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$NestedStreamOps$;
import fs2.aws.core.package$;
import fs2.concurrent.SignallingRef;
import fs2.concurrent.SignallingRef$;
import scala.DummyImplicit$;
import scala.Function1;
import scala.MatchError;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.util.NotGiven$;

/* compiled from: DynamoDB.scala */
/* loaded from: input_file:fs2/aws/dynamodb/DynamoDB.class */
public interface DynamoDB<F> {

    /* compiled from: DynamoDB.scala */
    /* loaded from: input_file:fs2/aws/dynamodb/DynamoDB$GenericKinesis.class */
    public static abstract class GenericKinesis<F> implements DynamoDB<F> {
        private final Async<F> evidence$1;

        public GenericKinesis(Async<F> async, GenConcurrent<F, Throwable> genConcurrent) {
            this.evidence$1 = async;
        }

        public Stream<F, CommittableRecord> readChunksFromDynamoDBStream(Function1<IRecordProcessorFactory, F> function1) {
            return Stream$.MODULE$.resource(Dispatcher$.MODULE$.parallel(true, this.evidence$1), this.evidence$1).flatMap(dispatcher -> {
                return Stream$.MODULE$.eval(Queue$.MODULE$.unbounded(this.evidence$1)).flatMap(queue -> {
                    return Stream$.MODULE$.eval(SignallingRef$.MODULE$.apply(BoxesRunTime.boxToBoolean(false), this.evidence$1)).flatMap(signallingRef -> {
                        return instantiateScheduler$1(function1, dispatcher, queue, signallingRef).flatMap(worker -> {
                            return Stream$.MODULE$.fromQueueUnterminated(queue, Stream$.MODULE$.fromQueueUnterminated$default$2(), this.evidence$1).interruptWhen(signallingRef, this.evidence$1).map(committableRecord -> {
                                return committableRecord;
                            });
                        }, NotGiven$.MODULE$.value());
                    }, NotGiven$.MODULE$.value());
                }, NotGiven$.MODULE$.value());
            }, NotGiven$.MODULE$.value());
        }

        @Override // fs2.aws.dynamodb.DynamoDB
        public Function1<Stream<F, CommittableRecord>, Stream<F, Record>> checkpointRecords(KinesisCheckpointSettings kinesisCheckpointSettings) {
            return stream -> {
                return Stream$NestedStreamOps$.MODULE$.parJoinUnbounded$extension(Stream$.MODULE$.NestedStreamOps(stream.through(package$.MODULE$.groupBy(committableRecord -> {
                    return cats.effect.package$.MODULE$.Sync().apply(this.evidence$1).pure(committableRecord.shardId());
                }, this.evidence$1)).map(tuple2 -> {
                    if (tuple2 != null) {
                        return ((Stream) tuple2._2()).broadcastThrough(ScalaRunTime$.MODULE$.wrapRefArray(new Function1[]{checkpoint$1(kinesisCheckpointSettings), bypass$1()}), this.evidence$1);
                    }
                    throw new MatchError(tuple2);
                })), this.evidence$1);
            };
        }

        private final void instantiateScheduler$1$$anonfun$2$$anonfun$1(Worker worker) {
            worker.run();
        }

        private final void instantiateScheduler$1$$anonfun$3$$anonfun$1(Worker worker) {
            worker.shutdown();
        }

        private final Stream instantiateScheduler$1(Function1 function1, Dispatcher dispatcher, Queue queue, SignallingRef signallingRef) {
            return Stream$.MODULE$.bracket(package$flatMap$.MODULE$.toFlatMapOps(function1.apply(() -> {
                return new RecordProcessor(committableRecord -> {
                    dispatcher.unsafeRunSync(queue.offer(committableRecord));
                });
            }), this.evidence$1).flatTap(worker -> {
                return cats.effect.package$.MODULE$.Concurrent().apply(this.evidence$1, DummyImplicit$.MODULE$.dummyImplicit()).start(package$flatMap$.MODULE$.toFlatMapOps(cats.effect.package$.MODULE$.Async().apply(this.evidence$1).blocking(() -> {
                    instantiateScheduler$1$$anonfun$2$$anonfun$1(worker);
                    return BoxedUnit.UNIT;
                }), this.evidence$1).flatTap(boxedUnit -> {
                    return signallingRef.set(BoxesRunTime.boxToBoolean(true));
                }));
            }), worker2 -> {
                return cats.effect.package$.MODULE$.Async().apply(this.evidence$1).blocking(() -> {
                    instantiateScheduler$1$$anonfun$3$$anonfun$1(worker2);
                    return BoxedUnit.UNIT;
                });
            });
        }

        private final Function1 checkpoint$1(KinesisCheckpointSettings kinesisCheckpointSettings) {
            return stream -> {
                return stream.groupWithin(kinesisCheckpointSettings.maxBatchSize(), kinesisCheckpointSettings.maxBatchWait(), this.evidence$1).collect(new DynamoDB$GenericKinesis$$anon$1()).flatMap(tuple2 -> {
                    if (tuple2 == null) {
                        throw new MatchError(tuple2);
                    }
                    int unboxToInt = BoxesRunTime.unboxToInt(tuple2._1());
                    CommittableRecord committableRecord = (CommittableRecord) tuple2._2();
                    return Stream$.MODULE$.eval(package$functor$.MODULE$.toFunctorOps(committableRecord.checkpoint(unboxToInt, this.evidence$1), this.evidence$1).as(committableRecord.record())).drain();
                }, NotGiven$.MODULE$.value());
            };
        }

        private final Function1 bypass$1() {
            return stream -> {
                return stream.map(committableRecord -> {
                    return committableRecord.record();
                });
            };
        }
    }

    static <F> DynamoDB<F> create(AmazonDynamoDBStreams amazonDynamoDBStreams, AmazonDynamoDB amazonDynamoDB, AmazonCloudWatch amazonCloudWatch, Async<F> async, GenConcurrent<F, Throwable> genConcurrent) {
        return DynamoDB$.MODULE$.create(amazonDynamoDBStreams, amazonDynamoDB, amazonCloudWatch, async, genConcurrent);
    }

    static <F> DynamoDB<F> create(Function1<IRecordProcessorFactory, Object> function1, Async<F> async) {
        return DynamoDB$.MODULE$.create(function1, async);
    }

    Stream<F, CommittableRecord> readFromDynamoDBStream(String str, String str2);

    Stream<F, CommittableRecord> readFromDynamoDBStream(KinesisClientLibConfiguration kinesisClientLibConfiguration);

    Function1<Stream<F, CommittableRecord>, Stream<F, Record>> checkpointRecords(KinesisCheckpointSettings kinesisCheckpointSettings);
}
