package com.datastax.ebdrivers.kafkaproducer;

import com.codahale.metrics.Timer;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner;
import io.nosqlbench.engine.api.activityapi.planning.SequencerType;
import io.nosqlbench.engine.api.activityconfig.StatementsLoader;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/datastax/ebdrivers/kafkaproducer/KafkaProducerActivity.class */
public class KafkaProducerActivity extends SimpleActivity {
    private static final Logger logger = LogManager.getLogger(KafkaProducerActivity.class);
    private String yamlLoc;
    private String clientId;
    private String servers;
    private OpSequence<KafkaStatement> opSequence;
    private String schemaRegistryUrl;
    Timer resultTimer;
    Timer resultSuccessTimer;

    public KafkaProducerActivity(ActivityDef activityDef) {
        super(activityDef);
    }

    public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
        super.onActivityDefUpdate(activityDef);
        this.yamlLoc = (String) activityDef.getParams().getOptionalString(new String[]{"yaml", "workload"}).orElseThrow(() -> {
            return new IllegalArgumentException("yaml is not defined");
        });
        this.servers = (String) Arrays.stream(((String) activityDef.getParams().getOptionalString(new String[]{"host", "hosts"}).orElse("localhost:9092")).split(",")).map(str -> {
            return str.indexOf(58) == -1 ? str + ":9092" : str;
        }).collect(Collectors.joining(","));
        this.clientId = (String) activityDef.getParams().getOptionalString(new String[]{"clientid", "client.id", "client_id"}).orElse("TestProducerClientId");
        this.schemaRegistryUrl = (String) activityDef.getParams().getOptionalString(new String[]{"schema_registry_url", "schema.registry.url"}).orElse("http://localhost:8081");
    }

    public void initActivity() {
        logger.debug("initializing activity: " + this.activityDef.getAlias());
        onActivityDefUpdate(this.activityDef);
        this.opSequence = initOpSequencer();
        setDefaultsFromOpSequence(this.opSequence);
        this.resultTimer = ActivityMetrics.timer(this.activityDef, "result");
        this.resultSuccessTimer = ActivityMetrics.timer(this.activityDef, "result-success");
    }

    private OpSequence<KafkaStatement> initOpSequencer() {
        SequencePlanner sequencePlanner = new SequencePlanner(SequencerType.valueOf((String) getParams().getOptionalString(new String[]{"seq"}).orElse("bucket")));
        List stmts = StatementsLoader.loadPath(logger, this.yamlLoc, this.activityDef.getParams(), new String[]{"activities"}).getStmts((String) this.activityDef.getParams().getOptionalString(new String[]{"tags"}).orElse(""));
        if (stmts.size() > 0) {
            Iterator it = stmts.iterator();
            while (it.hasNext()) {
                sequencePlanner.addOp(new KafkaStatement((OpTemplate) it.next(), this.servers, this.clientId, this.schemaRegistryUrl), ((Integer) r0.getParamOrDefault("ratio", 1)).intValue());
            }
        } else {
            logger.error("Unable to create a Kafka statement if you have no active statements.");
        }
        return sequencePlanner.resolve();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public OpSequence<KafkaStatement> getOpSequencer() {
        return this.opSequence;
    }
}
