package kafka.etl.impl;

import kafka.etl.KafkaETLInputFormat;
import kafka.etl.KafkaETLJob;
import kafka.etl.Props;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextOutputFormat;

/* loaded from: input_file:kafka/etl/impl/SimpleKafkaETLJob.class */
public class SimpleKafkaETLJob {
    protected String _name;
    protected Props _props;
    protected String _input;
    protected String _output;
    protected String _topic;

    public SimpleKafkaETLJob(String str, Props props) throws Exception {
        this._name = str;
        this._props = props;
        this._input = this._props.getProperty("input");
        this._output = this._props.getProperty("output");
        this._topic = props.getProperty("kafka.etl.topic");
    }

    protected JobConf createJobConf() throws Exception {
        JobConf createJobConf = KafkaETLJob.createJobConf("SimpleKafakETL", this._topic, this._props, getClass());
        createJobConf.setMapperClass(SimpleKafkaETLMapper.class);
        KafkaETLInputFormat.setInputPaths(createJobConf, new Path[]{new Path(this._input)});
        createJobConf.setOutputKeyClass(LongWritable.class);
        createJobConf.setOutputValueClass(Text.class);
        createJobConf.setOutputFormat(TextOutputFormat.class);
        TextOutputFormat.setCompressOutput(createJobConf, false);
        Path path = new Path(this._output);
        FileSystem fileSystem = path.getFileSystem(createJobConf);
        if (fileSystem.exists(path)) {
            fileSystem.delete(path);
        }
        TextOutputFormat.setOutputPath(createJobConf, path);
        createJobConf.setNumReduceTasks(0);
        return createJobConf;
    }

    public void execute() throws Exception {
        JobConf createJobConf = createJobConf();
        RunningJob submitJob = new JobClient(createJobConf).submitJob(createJobConf);
        String jobID = submitJob.getJobID();
        System.out.println("Hadoop job id=" + jobID);
        submitJob.waitForCompletion();
        if (!submitJob.isSuccessful()) {
            throw new Exception("Hadoop ETL job failed! Please check status on http://" + createJobConf.get("mapred.job.tracker") + "/jobdetails.jsp?jobid=" + jobID);
        }
    }

    public static void main(String[] strArr) throws Exception {
        if (strArr.length < 1) {
            throw new Exception("Usage: - config_file");
        }
        new SimpleKafkaETLJob("SimpleKafkaETLJob", new Props(strArr[0])).execute();
    }
}
