package kafka.bridge.pig;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import kafka.bridge.hadoop.KafkaOutputFormat;
import kafka.bridge.hadoop.KafkaRecordWriter;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Encoder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.piggybank.storage.avro.PigAvroDatumWriter;
import org.apache.pig.piggybank.storage.avro.PigSchema2Avro;

/* loaded from: input_file:kafka/bridge/pig/AvroKafkaStorage.class */
public class AvroKafkaStorage extends StoreFunc {
    protected KafkaRecordWriter<Object, byte[]> writer;
    protected Schema avroSchema;
    protected PigAvroDatumWriter datumWriter;
    protected Encoder encoder;
    protected ByteArrayOutputStream os;

    public AvroKafkaStorage(String str) {
        this.avroSchema = Schema.parse(str);
    }

    public OutputFormat getOutputFormat() throws IOException {
        return new KafkaOutputFormat();
    }

    public String relToAbsPathForStoreLocation(String str, Path path) throws IOException {
        return str;
    }

    public void setStoreLocation(String str, Job job) throws IOException {
        KafkaOutputFormat.setOutputPath(job, new Path(str));
    }

    public void prepareToWrite(RecordWriter recordWriter) throws IOException {
        if (this.avroSchema == null) {
            throw new IllegalStateException("avroSchema shouldn't be null");
        }
        this.writer = (KafkaRecordWriter) recordWriter;
        this.datumWriter = new PigAvroDatumWriter(this.avroSchema);
        this.os = new ByteArrayOutputStream();
        this.encoder = new BinaryEncoder(this.os);
    }

    public void cleanupOnFailure(String str, Job job) throws IOException {
    }

    public void setStoreFuncUDFContextSignature(String str) {
    }

    public void checkSchema(ResourceSchema resourceSchema) throws IOException {
        this.avroSchema = PigSchema2Avro.validateAndConvert(this.avroSchema, resourceSchema);
    }

    protected void writeEnvelope(OutputStream outputStream, Encoder encoder) throws IOException {
    }

    public void putNext(Tuple tuple) throws IOException {
        this.os.reset();
        writeEnvelope(this.os, this.encoder);
        this.datumWriter.write(tuple, this.encoder);
        this.encoder.flush();
        try {
            this.writer.write(null, this.os.toByteArray());
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }
}
