|
@@ -35,11 +35,15 @@ import org.apache.hadoop.io.compress.SplitCompressionInputStream;
|
|
|
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
|
|
|
import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
|
|
import org.apache.hadoop.io.compress.Decompressor;
|
|
|
+import org.apache.hadoop.mapred.Counters;
|
|
|
import org.apache.hadoop.mapreduce.Counter;
|
|
|
import org.apache.hadoop.mapreduce.InputSplit;
|
|
|
import org.apache.hadoop.mapreduce.RecordReader;
|
|
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
|
|
import org.apache.hadoop.mapreduce.MapContext;
|
|
|
+import org.apache.hadoop.mapreduce.counters.CounterGroupFactory;
|
|
|
+import org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup;
|
|
|
+import org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup.FrameworkCounter;
|
|
|
import org.apache.hadoop.util.LineReader;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -72,8 +76,15 @@ public class LineRecordReader extends RecordReader<LongWritable, Text> {
|
|
|
public void initialize(InputSplit genericSplit,
|
|
|
TaskAttemptContext context) throws IOException {
|
|
|
FileSplit split = (FileSplit) genericSplit;
|
|
|
- inputByteCounter = ((MapContext)context).getCounter(
|
|
|
- FileInputFormat.COUNTER_GROUP, FileInputFormat.BYTES_READ);
|
|
|
+ /* TODO This is a hack. MAPREDUCE-2365 is the proper solution */
|
|
|
+ if (context instanceof MapContext) {
|
|
|
+ inputByteCounter =
|
|
|
+ ((MapContext)context).getCounter(
|
|
|
+ FileInputFormat.COUNTER_GROUP, FileInputFormat.BYTES_READ);
|
|
|
+ } else {
|
|
|
+ inputByteCounter = new Counters().findCounter(FileInputFormat.COUNTER_GROUP
|
|
|
+ , FileInputFormat.BYTES_READ);
|
|
|
+ }
|
|
|
Configuration job = context.getConfiguration();
|
|
|
this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
|
|
|
start = split.getStart();
|