|
@@ -45,7 +45,9 @@ import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.io.BytesWritable;
|
|
|
import org.apache.hadoop.io.DataInputBuffer;
|
|
|
import org.apache.hadoop.io.RawComparator;
|
|
|
+import org.apache.hadoop.io.SequenceFile;
|
|
|
import org.apache.hadoop.io.Text;
|
|
|
+import org.apache.hadoop.io.SequenceFile.CompressionType;
|
|
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
|
|
import org.apache.hadoop.io.compress.DefaultCodec;
|
|
|
import org.apache.hadoop.io.serializer.SerializationFactory;
|
|
@@ -53,6 +55,7 @@ import org.apache.hadoop.io.serializer.Serializer;
|
|
|
import org.apache.hadoop.mapred.IFile.Writer;
|
|
|
import org.apache.hadoop.mapred.IFile.Reader;
|
|
|
import org.apache.hadoop.mapred.Merger.Segment;
|
|
|
+import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
|
|
|
import org.apache.hadoop.util.IndexedSortable;
|
|
|
import org.apache.hadoop.util.IndexedSorter;
|
|
|
import org.apache.hadoop.util.Progress;
|
|
@@ -143,19 +146,14 @@ class MapTask extends Task {
|
|
|
private RecordReader<K,V> rawIn;
|
|
|
private Counters.Counter inputByteCounter;
|
|
|
private Counters.Counter inputRecordCounter;
|
|
|
- private Iterator<Long> skipFailedRecIndexIterator;
|
|
|
- private TaskUmbilicalProtocol umbilical;
|
|
|
- private long recIndex = -1;
|
|
|
private long beforePos = -1;
|
|
|
private long afterPos = -1;
|
|
|
|
|
|
- TrackedRecordReader(RecordReader<K,V> raw, Counters counters,
|
|
|
- TaskUmbilicalProtocol umbilical) {
|
|
|
+ TrackedRecordReader(RecordReader<K,V> raw, Counters counters)
|
|
|
+ throws IOException{
|
|
|
rawIn = raw;
|
|
|
- this.umbilical = umbilical;
|
|
|
inputRecordCounter = counters.findCounter(MAP_INPUT_RECORDS);
|
|
|
inputByteCounter = counters.findCounter(MAP_INPUT_BYTES);
|
|
|
- skipFailedRecIndexIterator = getFailedRanges().skipRangeIterator();
|
|
|
}
|
|
|
|
|
|
public K createKey() {
|
|
@@ -169,28 +167,22 @@ class MapTask extends Task {
|
|
|
public synchronized boolean next(K key, V value)
|
|
|
throws IOException {
|
|
|
boolean ret = moveToNext(key, value);
|
|
|
- if(isSkipping() && ret) {
|
|
|
- long nextRecIndex = skipFailedRecIndexIterator.next();
|
|
|
- long skip = nextRecIndex - recIndex;
|
|
|
- for(int i=0;i<skip && ret;i++) {
|
|
|
- ret = moveToNext(key, value);
|
|
|
- }
|
|
|
- getCounters().incrCounter(Counter.MAP_SKIPPED_RECORDS, skip);
|
|
|
- reportNextRecordRange(umbilical, nextRecIndex);
|
|
|
- }
|
|
|
if (ret) {
|
|
|
- inputRecordCounter.increment(1);
|
|
|
- inputByteCounter.increment(afterPos - beforePos);
|
|
|
+ incrCounters();
|
|
|
}
|
|
|
return ret;
|
|
|
}
|
|
|
+
|
|
|
+ protected void incrCounters() {
|
|
|
+ inputRecordCounter.increment(1);
|
|
|
+ inputByteCounter.increment(afterPos - beforePos);
|
|
|
+ }
|
|
|
|
|
|
- private synchronized boolean moveToNext(K key, V value)
|
|
|
+ protected synchronized boolean moveToNext(K key, V value)
|
|
|
throws IOException {
|
|
|
setProgress(getProgress());
|
|
|
beforePos = getPos();
|
|
|
boolean ret = rawIn.next(key, value);
|
|
|
- recIndex++;
|
|
|
afterPos = getPos();
|
|
|
return ret;
|
|
|
}
|
|
@@ -202,6 +194,68 @@ class MapTask extends Task {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * This class skips the records based on the failed ranges from previous
|
|
|
+ * attempts.
|
|
|
+ */
|
|
|
+ class SkippingRecordReader<K, V> extends TrackedRecordReader<K,V> {
|
|
|
+ private SkipRangeIterator skipIt;
|
|
|
+ private SequenceFile.Writer skipWriter;
|
|
|
+ private TaskUmbilicalProtocol umbilical;
|
|
|
+ private Counters.Counter skipRecCounter;
|
|
|
+ private long recIndex = -1;
|
|
|
+
|
|
|
+ SkippingRecordReader(RecordReader<K,V> raw, Counters counters,
|
|
|
+ TaskUmbilicalProtocol umbilical) throws IOException{
|
|
|
+ super(raw,counters);
|
|
|
+ this.umbilical = umbilical;
|
|
|
+ this.skipRecCounter = counters.findCounter(Counter.MAP_SKIPPED_RECORDS);
|
|
|
+ skipIt = getFailedRanges().skipRangeIterator();
|
|
|
+ }
|
|
|
+
|
|
|
+ public synchronized boolean next(K key, V value)
|
|
|
+ throws IOException {
|
|
|
+ boolean ret = moveToNext(key, value);
|
|
|
+ long nextRecIndex = skipIt.next();
|
|
|
+ long skip = nextRecIndex - recIndex;
|
|
|
+ for(int i=0;i<skip && ret;i++) {
|
|
|
+ writeSkippedRec(key, value);
|
|
|
+ ret = moveToNext(key, value);
|
|
|
+ }
|
|
|
+ //close the skip writer once all the ranges are skipped
|
|
|
+ if(skip>0 && skipIt.skippedAllRanges() && skipWriter!=null) {
|
|
|
+ skipWriter.close();
|
|
|
+ }
|
|
|
+ skipRecCounter.increment(skip);
|
|
|
+ reportNextRecordRange(umbilical, nextRecIndex);
|
|
|
+ if (ret) {
|
|
|
+ incrCounters();
|
|
|
+ }
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected synchronized boolean moveToNext(K key, V value)
|
|
|
+ throws IOException {
|
|
|
+ recIndex++;
|
|
|
+ return super.moveToNext(key, value);
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ private void writeSkippedRec(K key, V value) throws IOException{
|
|
|
+ if(skipWriter==null) {
|
|
|
+ Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
|
|
|
+ Path skipFile = new Path(skipDir, getTaskID().toString());
|
|
|
+ skipWriter =
|
|
|
+ SequenceFile.createWriter(
|
|
|
+ skipFile.getFileSystem(conf), conf, skipFile,
|
|
|
+ (Class<K>) createKey().getClass(),
|
|
|
+ (Class<V>) createValue().getClass(),
|
|
|
+ CompressionType.BLOCK, getReporter(umbilical));
|
|
|
+ }
|
|
|
+ skipWriter.append(key, value);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
|
|
@@ -244,7 +298,9 @@ class MapTask extends Task {
|
|
|
|
|
|
RecordReader rawIn = // open input
|
|
|
job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);
|
|
|
- RecordReader in = new TrackedRecordReader(rawIn, getCounters(), umbilical);
|
|
|
+ RecordReader in = isSkipping() ?
|
|
|
+ new SkippingRecordReader(rawIn, getCounters(), umbilical) :
|
|
|
+ new TrackedRecordReader(rawIn, getCounters());
|
|
|
job.setBoolean("mapred.skip.on", isSkipping());
|
|
|
|
|
|
MapRunnable runner =
|