package io.floodplain.debezium.postgres;

import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.floodplain.ChangeRecord;
import java.nio.charset.Charset;
import java.util.Properties;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import kotlin.Metadata;
import kotlin.ResultKt;
import kotlin.Unit;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlin.coroutines.intrinsics.IntrinsicsKt;
import kotlin.coroutines.jvm.internal.DebugMetadata;
import kotlin.coroutines.jvm.internal.SuspendLambda;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.internal.Intrinsics;
import kotlin.text.Charsets;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.CoroutineScope;
import kotlinx.coroutines.CoroutineStart;
import kotlinx.coroutines.GlobalScope;
import kotlinx.coroutines.channels.ChannelsKt;
import kotlinx.coroutines.channels.ProduceKt;
import kotlinx.coroutines.channels.ProducerScope;
import kotlinx.coroutines.channels.SendChannel;
import mu.KLogger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* JADX INFO: Access modifiers changed from: package-private */
/* compiled from: DebeziumSource.kt */
@Metadata(mv = {1, 4, 1}, bv = {1, 0, 3}, k = 3, d1 = {"��\u0012\n��\n\u0002\u0010\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\u0010��\u001a\u00020\u0001*\b\u0012\u0004\u0012\u00020\u00030\u0002H\u008a@¢\u0006\u0004\b\u0004\u0010\u0005"}, d2 = {"<anonymous>", "", "Lkotlinx/coroutines/channels/ProducerScope;", "Lio/floodplain/ChangeRecord;", "invoke", "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;"})
@DebugMetadata(f = "DebeziumSource.kt", l = {141}, i = {}, s = {}, n = {}, m = "invokeSuspend", c = "io.floodplain.debezium.postgres.DebeziumSourceKt$runDebeziumServer$1")
/* loaded from: input_file:io/floodplain/debezium/postgres/DebeziumSourceKt$runDebeziumServer$1.class */
public final class DebeziumSourceKt$runDebeziumServer$1 extends SuspendLambda implements Function2<ProducerScope<? super ChangeRecord>, Continuation<? super Unit>, Object> {
    private /* synthetic */ Object L$0;
    int label;
    final /* synthetic */ Properties $props;
    final /* synthetic */ EngineKillSwitch $engineKillSwitch;
    final /* synthetic */ AtomicLong $totalTimeInSend;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* compiled from: DebeziumSource.kt */
    @Metadata(mv = {1, 4, 1}, bv = {1, 0, 3}, k = 3, d1 = {"��\u000e\n��\n\u0002\u0010\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\u0010��\u001a\u00020\u0001*\u00020\u0002H\u008a@¢\u0006\u0004\b\u0003\u0010\u0004"}, d2 = {"<anonymous>", "", "Lkotlinx/coroutines/CoroutineScope;", "invoke", "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;"})
    @DebugMetadata(f = "DebeziumSource.kt", l = {}, i = {}, s = {}, n = {}, m = "invokeSuspend", c = "io.floodplain.debezium.postgres.DebeziumSourceKt$runDebeziumServer$1$1")
    /* renamed from: io.floodplain.debezium.postgres.DebeziumSourceKt$runDebeziumServer$1$1, reason: invalid class name */
    /* loaded from: input_file:io/floodplain/debezium/postgres/DebeziumSourceKt$runDebeziumServer$1$1.class */
    public static final class AnonymousClass1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
        int label;
        final /* synthetic */ DebeziumEngine $engine;

        @Nullable
        public final Object invokeSuspend(@NotNull Object obj) {
            KLogger kLogger;
            KLogger kLogger2;
            IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch (this.label) {
                case 0:
                    ResultKt.throwOnFailure(obj);
                    kLogger = DebeziumSourceKt.logger;
                    StringBuilder append = new StringBuilder().append("Engine ran for: ");
                    long currentTimeMillis = System.currentTimeMillis();
                    this.$engine.run();
                    Unit unit = Unit.INSTANCE;
                    kLogger.info(append.append(System.currentTimeMillis() - currentTimeMillis).toString());
                    kLogger2 = DebeziumSourceKt.logger;
                    kLogger2.info("Debezium source engine terminated. Total time in send: " + DebeziumSourceKt$runDebeziumServer$1.this.$totalTimeInSend.get());
                    return Unit.INSTANCE;
                default:
                    throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }
        }

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        AnonymousClass1(DebeziumEngine debeziumEngine, Continuation continuation) {
            super(2, continuation);
            this.$engine = debeziumEngine;
        }

        @NotNull
        public final Continuation<Unit> create(@Nullable Object obj, @NotNull Continuation<?> continuation) {
            Intrinsics.checkNotNullParameter(continuation, "completion");
            return new AnonymousClass1(this.$engine, continuation);
        }

        public final Object invoke(Object obj, Object obj2) {
            return create(obj, (Continuation) obj2).invokeSuspend(Unit.INSTANCE);
        }
    }

    @Nullable
    public final Object invokeSuspend(@NotNull Object obj) {
        Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch (this.label) {
            case 0:
                ResultKt.throwOnFailure(obj);
                final ProducerScope producerScope = (ProducerScope) this.L$0;
                final DebeziumEngine<ChangeEvent<String, String>> build = DebeziumEngine.create(Json.class).using(this.$props).notifying(new Consumer<ChangeEvent<String, String>>() { // from class: io.floodplain.debezium.postgres.DebeziumSourceKt$runDebeziumServer$1$engine$1
                    @Override // java.util.function.Consumer
                    public final void accept(@NotNull ChangeEvent<String, String> changeEvent) {
                        KLogger kLogger;
                        KLogger kLogger2;
                        byte[] bArr;
                        Intrinsics.checkNotNullParameter(changeEvent, "record");
                        if (producerScope.isClosedForSend()) {
                            return;
                        }
                        long currentTimeMillis = System.currentTimeMillis();
                        try {
                            SendChannel sendChannel = producerScope;
                            String destination = changeEvent.destination();
                            Intrinsics.checkNotNullExpressionValue(destination, "record.destination()");
                            Object key = changeEvent.key();
                            Intrinsics.checkNotNullExpressionValue(key, "record.key()");
                            String str = (String) key;
                            String str2 = (String) changeEvent.value();
                            if (str2 != null) {
                                Charset charset = Charsets.UTF_8;
                                if (str2 == null) {
                                    throw new NullPointerException("null cannot be cast to non-null type java.lang.String");
                                }
                                bArr = str2.getBytes(charset);
                                Intrinsics.checkNotNullExpressionValue(bArr, "(this as java.lang.String).getBytes(charset)");
                            } else {
                                bArr = null;
                            }
                            ChannelsKt.sendBlocking(sendChannel, new ChangeRecord(destination, str, bArr));
                        } catch (CancellationException e) {
                            kLogger = DebeziumSourceKt.logger;
                            kLogger.info("engine cancelled");
                            DebeziumSourceKt$runDebeziumServer$1.this.$engineKillSwitch.kill();
                            Thread.currentThread().interrupt();
                        }
                        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                        if (currentTimeMillis2 > 1000) {
                            kLogger2 = DebeziumSourceKt.logger;
                            kLogger2.debug("Send blocking ran for: " + currentTimeMillis2);
                        }
                        DebeziumSourceKt$runDebeziumServer$1.this.$totalTimeInSend.addAndGet(currentTimeMillis2);
                    }
                }).build();
                this.$engineKillSwitch.setEngine(build);
                BuildersKt.launch$default(GlobalScope.INSTANCE, (CoroutineContext) null, (CoroutineStart) null, new AnonymousClass1(build, null), 3, (Object) null);
                Function0<Unit> function0 = new Function0<Unit>() { // from class: io.floodplain.debezium.postgres.DebeziumSourceKt$runDebeziumServer$1.2
                    public /* bridge */ /* synthetic */ Object invoke() {
                        m3invoke();
                        return Unit.INSTANCE;
                    }

                    /* renamed from: invoke, reason: collision with other method in class */
                    public final void m3invoke() {
                        build.close();
                    }

                    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                    {
                        super(0);
                    }
                };
                this.label = 1;
                if (ProduceKt.awaitClose(producerScope, function0, this) == coroutine_suspended) {
                    return coroutine_suspended;
                }
                break;
            case 1:
                ResultKt.throwOnFailure(obj);
                break;
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
        return Unit.INSTANCE;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public DebeziumSourceKt$runDebeziumServer$1(Properties properties, EngineKillSwitch engineKillSwitch, AtomicLong atomicLong, Continuation continuation) {
        super(2, continuation);
        this.$props = properties;
        this.$engineKillSwitch = engineKillSwitch;
        this.$totalTimeInSend = atomicLong;
    }

    @NotNull
    public final Continuation<Unit> create(@Nullable Object obj, @NotNull Continuation<?> continuation) {
        Intrinsics.checkNotNullParameter(continuation, "completion");
        DebeziumSourceKt$runDebeziumServer$1 debeziumSourceKt$runDebeziumServer$1 = new DebeziumSourceKt$runDebeziumServer$1(this.$props, this.$engineKillSwitch, this.$totalTimeInSend, continuation);
        debeziumSourceKt$runDebeziumServer$1.L$0 = obj;
        return debeziumSourceKt$runDebeziumServer$1;
    }

    public final Object invoke(Object obj, Object obj2) {
        return create(obj, (Continuation) obj2).invokeSuspend(Unit.INSTANCE);
    }
}
