|
@@ -160,16 +160,20 @@ class MapTask extends Task {
|
|
private RecordReader<K,V> rawIn;
|
|
private RecordReader<K,V> rawIn;
|
|
private Counters.Counter inputByteCounter;
|
|
private Counters.Counter inputByteCounter;
|
|
private Counters.Counter inputRecordCounter;
|
|
private Counters.Counter inputRecordCounter;
|
|
|
|
+ private InputSplit split;
|
|
private TaskReporter reporter;
|
|
private TaskReporter reporter;
|
|
private long beforePos = -1;
|
|
private long beforePos = -1;
|
|
private long afterPos = -1;
|
|
private long afterPos = -1;
|
|
-
|
|
|
|
- TrackedRecordReader(RecordReader<K,V> raw, TaskReporter reporter)
|
|
|
|
|
|
+
|
|
|
|
+ TrackedRecordReader(InputSplit split, JobConf job, RecordReader<K,V> raw,
|
|
|
|
+ TaskReporter reporter)
|
|
throws IOException{
|
|
throws IOException{
|
|
rawIn = raw;
|
|
rawIn = raw;
|
|
inputRecordCounter = reporter.getCounter(MAP_INPUT_RECORDS);
|
|
inputRecordCounter = reporter.getCounter(MAP_INPUT_RECORDS);
|
|
inputByteCounter = reporter.getCounter(MAP_INPUT_BYTES);
|
|
inputByteCounter = reporter.getCounter(MAP_INPUT_BYTES);
|
|
this.reporter = reporter;
|
|
this.reporter = reporter;
|
|
|
|
+ this.split = split;
|
|
|
|
+ conf = job;
|
|
}
|
|
}
|
|
|
|
|
|
public K createKey() {
|
|
public K createKey() {
|
|
@@ -196,13 +200,22 @@ class MapTask extends Task {
|
|
|
|
|
|
protected synchronized boolean moveToNext(K key, V value)
|
|
protected synchronized boolean moveToNext(K key, V value)
|
|
throws IOException {
|
|
throws IOException {
|
|
- reporter.setProgress(getProgress());
|
|
|
|
- beforePos = getPos();
|
|
|
|
- boolean ret = rawIn.next(key, value);
|
|
|
|
- afterPos = getPos();
|
|
|
|
|
|
+ boolean ret = false;
|
|
|
|
+ try {
|
|
|
|
+ reporter.setProgress(getProgress());
|
|
|
|
+ beforePos = getPos();
|
|
|
|
+ ret = rawIn.next(key, value);
|
|
|
|
+ afterPos = getPos();
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ if (split instanceof FileSplit) {
|
|
|
|
+ LOG.error("IO error in map input file " + conf.get("map.input.file"));
|
|
|
|
+ throw new IOException("IO error in map input file "
|
|
|
|
+ + conf.get("map.input.file"), ioe);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
return ret;
|
|
return ret;
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
public long getPos() throws IOException { return rawIn.getPos(); }
|
|
public long getPos() throws IOException { return rawIn.getPos(); }
|
|
public void close() throws IOException { rawIn.close(); }
|
|
public void close() throws IOException { rawIn.close(); }
|
|
public float getProgress() throws IOException {
|
|
public float getProgress() throws IOException {
|
|
@@ -227,7 +240,7 @@ class MapTask extends Task {
|
|
|
|
|
|
SkippingRecordReader(RecordReader<K,V> raw, TaskUmbilicalProtocol umbilical,
|
|
SkippingRecordReader(RecordReader<K,V> raw, TaskUmbilicalProtocol umbilical,
|
|
TaskReporter reporter) throws IOException{
|
|
TaskReporter reporter) throws IOException{
|
|
- super(raw, reporter);
|
|
|
|
|
|
+ super(null, conf, raw, reporter);
|
|
this.umbilical = umbilical;
|
|
this.umbilical = umbilical;
|
|
this.skipRecCounter = reporter.getCounter(Counter.MAP_SKIPPED_RECORDS);
|
|
this.skipRecCounter = reporter.getCounter(Counter.MAP_SKIPPED_RECORDS);
|
|
this.toWriteSkipRecs = toWriteSkipRecs() &&
|
|
this.toWriteSkipRecs = toWriteSkipRecs() &&
|
|
@@ -363,7 +376,7 @@ class MapTask extends Task {
|
|
job.getInputFormat().getRecordReader(inputSplit, job, reporter);
|
|
job.getInputFormat().getRecordReader(inputSplit, job, reporter);
|
|
RecordReader<INKEY,INVALUE> in = isSkipping() ?
|
|
RecordReader<INKEY,INVALUE> in = isSkipping() ?
|
|
new SkippingRecordReader<INKEY,INVALUE>(rawIn, umbilical, reporter) :
|
|
new SkippingRecordReader<INKEY,INVALUE>(rawIn, umbilical, reporter) :
|
|
- new TrackedRecordReader<INKEY,INVALUE>(rawIn, reporter);
|
|
|
|
|
|
+ new TrackedRecordReader<INKEY,INVALUE>(inputSplit, job, rawIn, reporter);
|
|
job.setBoolean("mapred.skip.on", isSkipping());
|
|
job.setBoolean("mapred.skip.on", isSkipping());
|
|
|
|
|
|
|
|
|
|
@@ -407,11 +420,14 @@ class MapTask extends Task {
|
|
private final org.apache.hadoop.mapreduce.RecordReader<K,V> real;
|
|
private final org.apache.hadoop.mapreduce.RecordReader<K,V> real;
|
|
private final org.apache.hadoop.mapreduce.Counter inputRecordCounter;
|
|
private final org.apache.hadoop.mapreduce.Counter inputRecordCounter;
|
|
private final TaskReporter reporter;
|
|
private final TaskReporter reporter;
|
|
|
|
+ private org.apache.hadoop.mapreduce.InputSplit inputSplit;
|
|
|
|
|
|
- NewTrackingRecordReader(org.apache.hadoop.mapreduce.RecordReader<K,V> real,
|
|
|
|
- TaskReporter reporter) {
|
|
|
|
|
|
+ NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
|
|
|
|
+ org.apache.hadoop.mapreduce.RecordReader<K,V> real,
|
|
|
|
+ TaskReporter reporter) {
|
|
this.real = real;
|
|
this.real = real;
|
|
this.reporter = reporter;
|
|
this.reporter = reporter;
|
|
|
|
+ this.inputSplit = split;
|
|
this.inputRecordCounter = reporter.getCounter(MAP_INPUT_RECORDS);
|
|
this.inputRecordCounter = reporter.getCounter(MAP_INPUT_RECORDS);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -444,11 +460,22 @@ class MapTask extends Task {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public boolean nextKeyValue() throws IOException, InterruptedException {
|
|
public boolean nextKeyValue() throws IOException, InterruptedException {
|
|
- boolean result = real.nextKeyValue();
|
|
|
|
- if (result) {
|
|
|
|
- inputRecordCounter.increment(1);
|
|
|
|
|
|
+ boolean result = false;
|
|
|
|
+ try {
|
|
|
|
+ result = real.nextKeyValue();
|
|
|
|
+ if (result) {
|
|
|
|
+ inputRecordCounter.increment(1);
|
|
|
|
+ }
|
|
|
|
+ reporter.setProgress(getProgress());
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ if (inputSplit instanceof FileSplit) {
|
|
|
|
+ FileSplit fileSplit = (FileSplit) inputSplit;
|
|
|
|
+ LOG.error("IO error in map input file "
|
|
|
|
+ + fileSplit.getPath().toString());
|
|
|
|
+ throw new IOException("IO error in map input file "
|
|
|
|
+ + fileSplit.getPath().toString(), ioe);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- reporter.setProgress(getProgress());
|
|
|
|
return result;
|
|
return result;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -604,7 +631,8 @@ class MapTask extends Task {
|
|
|
|
|
|
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
|
|
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
|
|
new NewTrackingRecordReader<INKEY,INVALUE>
|
|
new NewTrackingRecordReader<INKEY,INVALUE>
|
|
- (inputFormat.createRecordReader(split, taskContext), reporter);
|
|
|
|
|
|
+ (split, inputFormat.createRecordReader(split, taskContext),
|
|
|
|
+ reporter);
|
|
|
|
|
|
job.setBoolean("mapred.skip.on", isSkipping());
|
|
job.setBoolean("mapred.skip.on", isSkipping());
|
|
org.apache.hadoop.mapreduce.RecordWriter output = null;
|
|
org.apache.hadoop.mapreduce.RecordWriter output = null;
|