package kafka.etl.impl;

import java.net.URI;
import java.util.ArrayList;
import java.util.Properties;
import java.util.Random;
import kafka.etl.KafkaETLKey;
import kafka.etl.KafkaETLRequest;
import kafka.etl.Props;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.JobConf;

/* loaded from: input_file:kafka/etl/impl/DataGenerator.class */
public class DataGenerator {
    protected static final Random RANDOM = new Random(System.currentTimeMillis());
    protected Props _props;
    protected Producer _producer;
    protected URI _uri;
    protected String _topic;
    protected int _count;
    protected String _offsetsDir;
    protected final int TCP_BUFFER_SIZE = 300000;
    protected final int CONNECT_TIMEOUT = 20000;
    protected final int RECONNECT_INTERVAL = Integer.MAX_VALUE;

    public DataGenerator(String str, Props props) throws Exception {
        this._producer = null;
        this._uri = null;
        this._props = props;
        this._topic = props.getProperty("kafka.etl.topic");
        System.out.println("topics=" + this._topic);
        this._count = props.getInt("event.count").intValue();
        this._offsetsDir = this._props.getProperty("input");
        this._uri = new URI(this._props.getProperty("kafka.server.uri"));
        System.out.println("server uri:" + this._uri.toString());
        Properties properties = new Properties();
        properties.put("metadata.broker.list", String.format("%s:%d", this._uri.getHost(), Integer.valueOf(this._uri.getPort())));
        properties.put("send.buffer.bytes", String.valueOf(300000));
        properties.put("connect.timeout.ms", String.valueOf(20000));
        properties.put("reconnect.interval", String.valueOf(Integer.MAX_VALUE));
        this._producer = new Producer(new ProducerConfig(properties));
    }

    public void run() throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this._count; i++) {
            Long valueOf = Long.valueOf(RANDOM.nextLong());
            if (valueOf.longValue() < 0) {
                valueOf = Long.valueOf(-valueOf.longValue());
            }
            arrayList.add(new KeyedMessage(this._topic, (Object) null, valueOf.toString().getBytes("UTF8")));
        }
        System.out.println(" send " + arrayList.size() + " " + this._topic + " count events to " + this._uri);
        this._producer.send(arrayList);
        this._producer.close();
        generateOffsets();
    }

    protected void generateOffsets() throws Exception {
        JobConf jobConf = new JobConf();
        jobConf.set("hadoop.job.ugi", this._props.getProperty("hadoop.job.ugi"));
        jobConf.setCompressMapOutput(false);
        Path path = new Path(this._offsetsDir + "/1.dat");
        FileSystem fileSystem = path.getFileSystem(jobConf);
        if (fileSystem.exists(path)) {
            fileSystem.delete(path);
        }
        KafkaETLRequest kafkaETLRequest = new KafkaETLRequest(this._topic, "tcp://" + this._uri.getHost() + ":" + this._uri.getPort(), 0);
        System.out.println("Dump " + kafkaETLRequest.toString() + " to " + path.toUri().toString());
        byte[] bytes = kafkaETLRequest.toString().getBytes("UTF-8");
        KafkaETLKey kafkaETLKey = new KafkaETLKey();
        SequenceFile.setCompressionType(jobConf, SequenceFile.CompressionType.NONE);
        SequenceFile.Writer createWriter = SequenceFile.createWriter(fileSystem, jobConf, path, KafkaETLKey.class, BytesWritable.class);
        createWriter.append(kafkaETLKey, new BytesWritable(bytes));
        createWriter.close();
    }

    public static void main(String[] strArr) throws Exception {
        if (strArr.length < 1) {
            throw new Exception("Usage: - config_file");
        }
        new DataGenerator("DataGenerator", new Props(strArr[0])).run();
    }
}
