|
@@ -280,13 +280,13 @@ class MapTask extends Task {
|
|
|
private Counters.Counter skipRecCounter;
|
|
|
private long recIndex = -1;
|
|
|
|
|
|
- SkippingRecordReader(TaskUmbilicalProtocol umbilical, TaskReporter reporter)
|
|
|
- throws IOException {
|
|
|
- super(null, conf, reporter);
|
|
|
+ SkippingRecordReader(InputSplit split, TaskUmbilicalProtocol umbilical,
|
|
|
+ TaskReporter reporter) throws IOException {
|
|
|
+ super(split, conf, reporter);
|
|
|
this.umbilical = umbilical;
|
|
|
this.skipRecCounter = reporter.getCounter(Counter.MAP_SKIPPED_RECORDS);
|
|
|
this.toWriteSkipRecs = toWriteSkipRecs() &&
|
|
|
- SkipBadRecords.getSkipOutputPath(conf)!=null;
|
|
|
+ SkipBadRecords.getSkipOutputPath(conf)!=null;
|
|
|
skipIt = getSkipRanges().skipRangeIterator();
|
|
|
}
|
|
|
|
|
@@ -415,7 +415,7 @@ class MapTask extends Task {
|
|
|
reporter.setInputSplit(inputSplit);
|
|
|
|
|
|
RecordReader<INKEY,INVALUE> in = isSkipping() ?
|
|
|
- new SkippingRecordReader<INKEY,INVALUE>(umbilical, reporter) :
|
|
|
+ new SkippingRecordReader<INKEY,INVALUE>(inputSplit, umbilical, reporter) :
|
|
|
new TrackedRecordReader<INKEY,INVALUE>(inputSplit, job, reporter);
|
|
|
job.setBoolean("mapred.skip.on", isSkipping());
|
|
|
|