|
@@ -28,9 +28,11 @@ import org.apache.hadoop.io.LongWritable;
|
|
import org.apache.hadoop.io.Text;
|
|
import org.apache.hadoop.io.Text;
|
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
|
import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
|
import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
|
|
|
+import org.apache.hadoop.mapreduce.Counter;
|
|
import org.apache.hadoop.mapreduce.InputSplit;
|
|
import org.apache.hadoop.mapreduce.InputSplit;
|
|
import org.apache.hadoop.mapreduce.RecordReader;
|
|
import org.apache.hadoop.mapreduce.RecordReader;
|
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
|
|
|
+import org.apache.hadoop.mapreduce.MapContext;
|
|
import org.apache.hadoop.util.LineReader;
|
|
import org.apache.hadoop.util.LineReader;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
@@ -49,10 +51,13 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
|
|
private int maxLineLength;
|
|
private int maxLineLength;
|
|
private LongWritable key = null;
|
|
private LongWritable key = null;
|
|
private Text value = null;
|
|
private Text value = null;
|
|
|
|
+ private Counter inputByteCounter;
|
|
|
|
|
|
public void initialize(InputSplit genericSplit,
|
|
public void initialize(InputSplit genericSplit,
|
|
TaskAttemptContext context) throws IOException {
|
|
TaskAttemptContext context) throws IOException {
|
|
FileSplit split = (FileSplit) genericSplit;
|
|
FileSplit split = (FileSplit) genericSplit;
|
|
|
|
+ inputByteCounter = ((MapContext)context).getCounter(
|
|
|
|
+ FileInputFormat.COUNTER_GROUP, FileInputFormat.BYTES_READ);
|
|
Configuration job = context.getConfiguration();
|
|
Configuration job = context.getConfiguration();
|
|
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
|
|
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
|
|
Integer.MAX_VALUE);
|
|
Integer.MAX_VALUE);
|
|
@@ -101,6 +106,7 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
pos += newSize;
|
|
pos += newSize;
|
|
|
|
+ inputByteCounter.increment(newSize);
|
|
if (newSize < maxLineLength) {
|
|
if (newSize < maxLineLength) {
|
|
break;
|
|
break;
|
|
}
|
|
}
|