package kafka.bridge.hadoop;

import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import kafka.common.KafkaException;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.log4j.Logger;

/* loaded from: input_file:kafka/bridge/hadoop/KafkaOutputFormat.class */
public class KafkaOutputFormat<K, V> extends OutputFormat<K, V> {
    private Logger log = Logger.getLogger(KafkaOutputFormat.class);
    public static final String KAFKA_URL = "kafka.output.url";
    public static final int KAFKA_QUEUE_BYTES = 1000000;
    public static final String KAFKA_CONFIG_PREFIX = "kafka.output";
    private static final Map<String, String> kafkaConfigMap;

    public static void setOutputPath(Job job, Path path) {
        job.getConfiguration().set(KAFKA_URL, path.toString());
        job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
        job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false);
    }

    public static Path getOutputPath(JobContext jobContext) {
        String str = jobContext.getConfiguration().get(KAFKA_URL);
        if (str == null) {
            return null;
        }
        return new Path(str);
    }

    public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
    }

    public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new FileOutputCommitter(new Path("/tmp/" + taskAttemptContext.getTaskAttemptID().getJobID().toString()), taskAttemptContext);
    }

    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        Path outputPath = getOutputPath(taskAttemptContext);
        if (outputPath == null) {
            throw new KafkaException("no kafka output url specified");
        }
        URI create = URI.create(outputPath.toString());
        Configuration configuration = taskAttemptContext.getConfiguration();
        Properties properties = new Properties();
        properties.putAll(kafkaConfigMap);
        Iterator it = configuration.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            if (((String) entry.getKey()).startsWith(KAFKA_CONFIG_PREFIX) && !((String) entry.getKey()).equals(KAFKA_URL)) {
                properties.setProperty(((String) entry.getKey()).substring(KAFKA_CONFIG_PREFIX.length() + 1), (String) entry.getValue());
            }
        }
        for (Map.Entry<K, V> entry2 : properties.entrySet()) {
            configuration.set("kafka.output." + entry2.getKey().toString(), entry2.getValue().toString());
        }
        int i = configuration.getInt("kafka.output.queue.bytes", KAFKA_QUEUE_BYTES);
        if (!create.getScheme().equals("kafka")) {
            throw new KafkaException("missing scheme from kafka uri (must be kafka://)");
        }
        String authority = create.getAuthority();
        properties.setProperty("metadata.broker.list", authority);
        configuration.set("kafka.output.metadata.broker.list", authority);
        if (create.getPath() == null || create.getPath().length() <= 1) {
            throw new KafkaException("no topic specified in kafka uri");
        }
        String substring = create.getPath().substring(1);
        configuration.set("kafka.output.topic", substring);
        this.log.info(String.format("using kafka broker %s (topic %s)", authority, substring));
        return new KafkaRecordWriter(new Producer(new ProducerConfig(properties)), substring, i);
    }

    static {
        HashMap hashMap = new HashMap();
        hashMap.put("producer.type", "sync");
        hashMap.put("compression.codec", Integer.toString(1));
        hashMap.put("request.required.acks", Integer.toString(1));
        kafkaConfigMap = Collections.unmodifiableMap(hashMap);
    }
}
