package org.apache.kafka.trogdor.workload;

import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/trogdor/workload/ShareRoundTripWorker.class */
public class ShareRoundTripWorker extends RoundTripWorkerBase {
    private static final Logger log = LoggerFactory.getLogger(ShareRoundTripWorker.class);
    KafkaShareConsumer<byte[], byte[]> consumer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ShareRoundTripWorker(String str, RoundTripWorkloadSpec roundTripWorkloadSpec) {
        this.id = str;
        this.spec = roundTripWorkloadSpec;
    }

    @Override // org.apache.kafka.trogdor.workload.RoundTripWorkerBase
    public void initializeConsumer(HashSet<TopicPartition> hashSet) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.spec.bootstrapServers());
        properties.put("client.id", "consumer." + this.id);
        properties.put("auto.offset.reset", "earliest");
        properties.put("request.timeout.ms", 105000);
        properties.put("max.poll.interval.ms", 100000);
        WorkerUtils.addConfigsToProperties(properties, this.spec.commonClientConf(), this.spec.consumerConf());
        String str = "round-trip-share-group-" + this.id;
        properties.put("group.id", str);
        try {
            Admin createAdminClient = WorkerUtils.createAdminClient(this.spec.bootstrapServers(), this.spec.commonClientConf(), this.spec.adminClientConf());
            try {
                alterShareAutoOffsetReset(str, "earliest", createAdminClient);
                if (createAdminClient != null) {
                    createAdminClient.close();
                }
                this.consumer = new KafkaShareConsumer<>(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer());
                this.consumer.subscribe(this.spec.activeTopics().materialize().keySet());
            } finally {
            }
        } catch (Exception e) {
            log.warn("Failed to set share.auto.offset.reset config to 'earliest' mode", e);
            throw e;
        }
    }

    @Override // org.apache.kafka.trogdor.workload.RoundTripWorkerBase
    protected ConsumerRecords<byte[], byte[]> fetchRecords(Duration duration) {
        return this.consumer.poll(duration);
    }

    @Override // org.apache.kafka.trogdor.workload.RoundTripWorkerBase
    protected void shutdownConsumer() {
        Utils.closeQuietly(this.consumer, "consumer");
        this.consumer = null;
    }

    private void alterShareAutoOffsetReset(String str, String str2, Admin admin) {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, str);
        HashMap hashMap = new HashMap();
        hashMap.put(configResource, List.of(new AlterConfigOp(new ConfigEntry("share.auto.offset.reset", str2), AlterConfigOp.OpType.SET)));
        try {
            admin.incrementalAlterConfigs(hashMap, new AlterConfigsOptions()).all().get(60L, TimeUnit.SECONDS);
        } catch (Exception e) {
            throw new RuntimeException("Exception was thrown while attempting to set share.auto.offset.reset config: ", e);
        }
    }
}
