package kafka.etl;

import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.MultipleOutputs;

/* loaded from: input_file:kafka/etl/KafkaETLContext.class */
public class KafkaETLContext {
    static final String CLIENT_BUFFER_SIZE = "client.buffer.size";
    static final String CLIENT_TIMEOUT = "client.so.timeout";
    static final int DEFAULT_BUFFER_SIZE = 1048576;
    static final int DEFAULT_TIMEOUT = 60000;
    protected int _index;
    protected String _input;
    protected KafkaETLRequest _request;
    protected SimpleConsumer _consumer;
    protected long[] _offsetRange;
    protected long _offset;
    protected long _count;
    protected int _retry;
    protected long _requestTime;
    protected long _startTime;
    protected int _bufferSize;
    protected int _timeout;
    protected Reporter _reporter;
    protected MultipleOutputs _mos;
    protected static int MAX_RETRY_TIME = 1;
    static final KafkaETLKey DUMMY_KEY = new KafkaETLKey();
    protected FetchResponse _response = null;
    protected Iterator<MessageAndOffset> _messageIt = null;
    protected Iterator<ByteBufferMessageSet> _respIterator = null;
    protected OutputCollector<KafkaETLKey, BytesWritable> _offsetOut = null;
    protected FetchRequestBuilder builder = new FetchRequestBuilder();

    public long getTotalBytes() {
        if (this._offsetRange[1] > this._offsetRange[0]) {
            return this._offsetRange[1] - this._offsetRange[0];
        }
        return 0L;
    }

    public long getReadBytes() {
        return this._offset - this._offsetRange[0];
    }

    public long getCount() {
        return this._count;
    }

    public KafkaETLContext(JobConf jobConf, Props props, Reporter reporter, MultipleOutputs multipleOutputs, int i, String str) throws Exception {
        this._input = null;
        this._request = null;
        this._consumer = null;
        this._offsetRange = new long[]{0, 0};
        this._offset = Long.MAX_VALUE;
        this._retry = 0;
        this._requestTime = 0L;
        this._startTime = -1L;
        this._bufferSize = getClientBufferSize(props);
        this._timeout = getClientTimeout(props);
        System.out.println("bufferSize=" + this._bufferSize);
        System.out.println("timeout=" + this._timeout);
        this._reporter = reporter;
        this._mos = multipleOutputs;
        this._index = i;
        this._input = str;
        this._request = new KafkaETLRequest(str.trim());
        URI uri = this._request.getURI();
        this._consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), this._timeout, this._bufferSize, "KafkaETLContext");
        this._offsetRange = getOffsetRange();
        System.out.println("Connected to node " + uri + " beginning reading at offset " + this._offsetRange[0] + " latest offset=" + this._offsetRange[1]);
        this._offset = this._offsetRange[0];
        this._count = 0L;
        this._requestTime = 0L;
        this._retry = 0;
        this._startTime = System.currentTimeMillis();
    }

    public boolean hasMore() {
        return (this._messageIt != null && this._messageIt.hasNext()) || (this._response != null && this._respIterator.hasNext()) || this._offset < this._offsetRange[1];
    }

    public boolean getNext(KafkaETLKey kafkaETLKey, BytesWritable bytesWritable) throws IOException {
        if (!hasMore()) {
            return false;
        }
        boolean z = get(kafkaETLKey, bytesWritable);
        if (this._response != null) {
            while (!z && this._respIterator.hasNext()) {
                this._messageIt = this._respIterator.next().iterator();
                z = get(kafkaETLKey, bytesWritable);
            }
        }
        return z;
    }

    /* JADX WARN: Type inference failed for: r1v12, types: [kafka.etl.KafkaETLContext$1] */
    public boolean fetchMore() throws IOException {
        if (!hasMore()) {
            return false;
        }
        FetchRequest build = this.builder.clientId(this._request.clientId()).addFetch(this._request.getTopic(), this._request.getPartition(), this._offset, this._bufferSize).build();
        long currentTimeMillis = System.currentTimeMillis();
        this._response = this._consumer.fetch(build);
        if (this._response != null) {
            this._respIterator = new ArrayList<ByteBufferMessageSet>() { // from class: kafka.etl.KafkaETLContext.1
                {
                    add(KafkaETLContext.this._response.messageSet(KafkaETLContext.this._request.getTopic(), KafkaETLContext.this._request.getPartition()));
                }
            }.iterator();
        }
        this._requestTime += System.currentTimeMillis() - currentTimeMillis;
        return true;
    }

    public void output(String str) throws IOException {
        String kafkaETLRequest = this._request.toString(this._offset);
        if (this._offsetOut == null) {
            this._offsetOut = this._mos.getCollector("offsets", str + this._index, this._reporter);
        }
        this._offsetOut.collect(DUMMY_KEY, new BytesWritable(kafkaETLRequest.getBytes("UTF-8")));
    }

    public void close() throws IOException {
        if (this._consumer != null) {
            this._consumer.close();
        }
        String topic = this._request.getTopic();
        this._reporter.incrCounter(topic, "read-time(ms)", System.currentTimeMillis() - this._startTime);
        this._reporter.incrCounter(topic, "request-time(ms)", this._requestTime);
        this._reporter.incrCounter(topic, "data-read(mb)", (long) ((this._offset - this._offsetRange[0]) / 1048576.0d));
        this._reporter.incrCounter(topic, "event-count", this._count);
    }

    protected boolean get(KafkaETLKey kafkaETLKey, BytesWritable bytesWritable) throws IOException {
        if (this._messageIt == null || !this._messageIt.hasNext()) {
            return false;
        }
        MessageAndOffset next = this._messageIt.next();
        ByteBuffer buffer = next.message().buffer();
        int remaining = buffer.remaining();
        byte[] bArr = new byte[remaining];
        buffer.get(bArr, buffer.position(), remaining);
        bytesWritable.set(bArr, 0, remaining);
        kafkaETLKey.set(this._index, this._offset, next.message().checksum());
        this._offset = next.nextOffset();
        this._count++;
        return true;
    }

    protected long[] getOffsetRange() throws IOException {
        long[] jArr = new long[2];
        TopicAndPartition topicAndPartition = new TopicAndPartition(this._request.getTopic(), this._request.getPartition());
        HashMap hashMap = new HashMap();
        hashMap.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
        long[] offsets = this._consumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(hashMap, OffsetRequest.CurrentVersion(), OffsetRequest.DefaultClientId())).offsets(this._request.getTopic(), this._request.getPartition());
        if (offsets.length != 1) {
            throw new IOException("input:" + this._input + " Expect one smallest offset but get " + offsets.length);
        }
        jArr[0] = offsets[0];
        hashMap.clear();
        hashMap.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1));
        long[] offsets2 = this._consumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(hashMap, OffsetRequest.CurrentVersion(), OffsetRequest.DefaultClientId())).offsets(this._request.getTopic(), this._request.getPartition());
        if (offsets2.length != 1) {
            throw new IOException("input:" + this._input + " Expect one latest offset but get " + offsets2.length);
        }
        jArr[1] = offsets2[0];
        if (this._request.isValidOffset()) {
            long offset = this._request.getOffset();
            if (offset > jArr[0]) {
                System.out.println("Update starting offset with " + offset);
                jArr[0] = offset;
            } else {
                System.out.println("WARNING: given starting offset " + offset + " is smaller than the smallest one " + jArr[0] + ". Will ignore it.");
            }
        }
        System.out.println("Using offset range [" + jArr[0] + ", " + jArr[1] + "]");
        return jArr;
    }

    public static int getClientBufferSize(Props props) throws Exception {
        return props.getInt(CLIENT_BUFFER_SIZE, Integer.valueOf(DEFAULT_BUFFER_SIZE)).intValue();
    }

    public static int getClientTimeout(Props props) throws Exception {
        return props.getInt(CLIENT_TIMEOUT, Integer.valueOf(DEFAULT_TIMEOUT)).intValue();
    }
}
