package io.infinitic.pulsar.workers;

import io.infinitic.common.messages.Envelope;
import io.infinitic.common.tasks.data.TaskName;
import io.infinitic.metrics.perName.engine.storage.MetricsPerNameStateStorage;
import io.infinitic.metrics.perName.engine.worker.StartMetricsPerNameEngineKt;
import io.infinitic.pulsar.topics.TaskTopic;
import io.infinitic.pulsar.transport.PulsarConsumerFactory;
import io.infinitic.pulsar.transport.PulsarOutput;
import kotlin.Metadata;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.Intrinsics;
import kotlinx.coroutines.CoroutineScope;
import kotlinx.coroutines.channels.BufferOverflow;
import kotlinx.coroutines.channels.ChannelKt;
import kotlinx.coroutines.channels.ReceiveChannel;
import kotlinx.coroutines.channels.SendChannel;
import org.apache.pulsar.client.api.Consumer;
import org.jetbrains.annotations.NotNull;

/* compiled from: startPulsarMetricsPerNameEngines.kt */
@Metadata(mv = {1, 5, 1}, k = 2, xi = 48, d1 = {"��4\n��\n\u0002\u0010\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\u001a2\u0010��\u001a\u00020\u0001*\u00020\u00022\u0006\u0010\u0003\u001a\u00020\u00042\u0006\u0010\u0005\u001a\u00020\u00062\u0006\u0010\u0007\u001a\u00020\b2\u0006\u0010\t\u001a\u00020\n2\u0006\u0010\u000b\u001a\u00020\f*\u0016\u0010\r\"\b\u0012\u0004\u0012\u00020\u000f0\u000e2\b\u0012\u0004\u0012\u00020\u000f0\u000e¨\u0006\u0010"}, d2 = {"startPulsarMetricsPerNameEngines", "", "Lkotlinx/coroutines/CoroutineScope;", "taskName", "Lio/infinitic/common/tasks/data/TaskName;", "consumerName", "", "consumerFactory", "Lio/infinitic/pulsar/transport/PulsarConsumerFactory;", "storage", "Lio/infinitic/metrics/perName/engine/storage/MetricsPerNameStateStorage;", "pulsarOutput", "Lio/infinitic/pulsar/transport/PulsarOutput;", "PulsarMetricsPerNameMessageToProcess", "Lio/infinitic/pulsar/transport/PulsarMessageToProcess;", "Lio/infinitic/common/metrics/perName/messages/MetricsPerNameMessage;", "infinitic-pulsar"})
/* loaded from: input_file:io/infinitic/pulsar/workers/StartPulsarMetricsPerNameEnginesKt.class */
public final class StartPulsarMetricsPerNameEnginesKt {
    public static final void startPulsarMetricsPerNameEngines(@NotNull CoroutineScope coroutineScope, @NotNull TaskName taskName, @NotNull String str, @NotNull PulsarConsumerFactory pulsarConsumerFactory, @NotNull MetricsPerNameStateStorage metricsPerNameStateStorage, @NotNull PulsarOutput pulsarOutput) {
        Intrinsics.checkNotNullParameter(coroutineScope, "<this>");
        Intrinsics.checkNotNullParameter(taskName, "taskName");
        Intrinsics.checkNotNullParameter(str, "consumerName");
        Intrinsics.checkNotNullParameter(pulsarConsumerFactory, "consumerFactory");
        Intrinsics.checkNotNullParameter(metricsPerNameStateStorage, "storage");
        Intrinsics.checkNotNullParameter(pulsarOutput, "pulsarOutput");
        SendChannel Channel$default = ChannelKt.Channel$default(0, (BufferOverflow) null, (Function1) null, 7, (Object) null);
        ReceiveChannel Channel$default2 = ChannelKt.Channel$default(0, (BufferOverflow) null, (Function1) null, 7, (Object) null);
        StartMetricsPerNameEngineKt.startMetricsPerNameEngine(coroutineScope, "metrics-per-name", metricsPerNameStateStorage, (ReceiveChannel) Channel$default, (SendChannel) Channel$default2, pulsarOutput.sendToMetricsGlobal());
        Consumer<? extends Envelope<?>> newConsumer = pulsarConsumerFactory.newConsumer(str, TaskTopic.METRICS, taskName);
        StartPulsarKt.pullMessages(coroutineScope, newConsumer, Channel$default);
        StartPulsarKt.acknowledgeMessages(coroutineScope, newConsumer, Channel$default2);
    }
}
