|
@@ -35,6 +35,7 @@ import org.apache.hadoop.metrics.MetricsRecord;
|
|
|
import org.apache.commons.logging.*;
|
|
|
import org.apache.hadoop.metrics.Metrics;
|
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.fs.*;
|
|
|
import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
|
|
|
|
|
@@ -341,6 +342,7 @@ class MapTask extends Task {
|
|
|
throws IOException {
|
|
|
synchronized (this) {
|
|
|
writer.append(key, value);
|
|
|
+ reportProgress(umbilical);
|
|
|
}
|
|
|
}
|
|
|
};
|
|
@@ -366,6 +368,7 @@ class MapTask extends Task {
|
|
|
while (values.more()) {
|
|
|
combiner.reduce(values.getKey(), values, combineCollector, reporter);
|
|
|
values.nextKey();
|
|
|
+ reportProgress(umbilical);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -393,6 +396,7 @@ class MapTask extends Task {
|
|
|
value.readFields(valIn);
|
|
|
|
|
|
writer.append(key, value);
|
|
|
+ reportProgress(umbilical);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -425,62 +429,87 @@ class MapTask extends Task {
|
|
|
compressionType, codec);
|
|
|
finalIndexOut.writeLong(segmentStart);
|
|
|
finalIndexOut.writeLong(finalOut.getPos() - segmentStart);
|
|
|
+ reportProgress(umbilical);
|
|
|
}
|
|
|
finalOut.close();
|
|
|
finalIndexOut.close();
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
- Path [] filename = new Path[numSpills];
|
|
|
- Path [] indexFileName = new Path[numSpills];
|
|
|
- FSDataInputStream in[] = new FSDataInputStream[numSpills];
|
|
|
- FSDataInputStream indexIn[] = new FSDataInputStream[numSpills];
|
|
|
-
|
|
|
- for(int i = 0; i < numSpills; i++) {
|
|
|
- filename[i] = mapOutputFile.getSpillFile(getTaskId(), i);
|
|
|
- in[i] = localFs.open(filename[i]);
|
|
|
- indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskId(), i);
|
|
|
- indexIn[i] = localFs.open(indexFileName[i]);
|
|
|
- }
|
|
|
-
|
|
|
- //create a sorter object as we need access to the SegmentDescriptor
|
|
|
- //class and merge methods
|
|
|
- Sorter sorter = new Sorter(localFs, keyClass, valClass, job);
|
|
|
- sorter.setFactor(numSpills);
|
|
|
-
|
|
|
- for (int parts = 0; parts < partitions; parts++){
|
|
|
- List<SegmentDescriptor> segmentList = new ArrayList(numSpills);
|
|
|
+ //spawn a thread to give merge progress heartbeats
|
|
|
+ Thread sortProgress = new Thread() {
|
|
|
+ public void run() {
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ reportProgress(umbilical);
|
|
|
+ Thread.sleep(PROGRESS_INTERVAL);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ return;
|
|
|
+ } catch (Throwable e) {
|
|
|
+ LOG.info("Thread Exception in " +
|
|
|
+ "reporting sort progress\n" +
|
|
|
+ StringUtils.stringifyException(e));
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ sortProgress.setName("Sort progress reporter for task "+getTaskId());
|
|
|
+ sortProgress.setDaemon(true);
|
|
|
+ sortProgress.start();
|
|
|
+ try {
|
|
|
+ Path [] filename = new Path[numSpills];
|
|
|
+ Path [] indexFileName = new Path[numSpills];
|
|
|
+ FSDataInputStream in[] = new FSDataInputStream[numSpills];
|
|
|
+ FSDataInputStream indexIn[] = new FSDataInputStream[numSpills];
|
|
|
+
|
|
|
for(int i = 0; i < numSpills; i++) {
|
|
|
- long segmentOffset = indexIn[i].readLong();
|
|
|
- long segmentLength = indexIn[i].readLong();
|
|
|
- SegmentDescriptor s = sorter.new SegmentDescriptor(segmentOffset,
|
|
|
- segmentLength, filename[i]);
|
|
|
- s.preserveInput(true);
|
|
|
- s.doSync();
|
|
|
- segmentList.add(i, s);
|
|
|
+ filename[i] = mapOutputFile.getSpillFile(getTaskId(), i);
|
|
|
+ in[i] = localFs.open(filename[i]);
|
|
|
+ indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskId(), i);
|
|
|
+ indexIn[i] = localFs.open(indexFileName[i]);
|
|
|
}
|
|
|
- segmentStart = finalOut.getPos();
|
|
|
- SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut,
|
|
|
- job.getMapOutputKeyClass(), job.getMapOutputValueClass(),
|
|
|
- compressionType, codec);
|
|
|
- sorter.writeFile(sorter.merge(segmentList), writer);
|
|
|
- //add a sync block - required esp. for block compression to ensure
|
|
|
- //partition data don't span partition boundaries
|
|
|
- writer.sync();
|
|
|
- //when we write the offset/length to the final index file, we write
|
|
|
- //longs for both. This helps us to reliably seek directly to the
|
|
|
- //offset/length for a partition when we start serving the byte-ranges
|
|
|
- //to the reduces. We probably waste some space in the file by doing
|
|
|
- //this as opposed to writing VLong but it helps us later on.
|
|
|
- finalIndexOut.writeLong(segmentStart);
|
|
|
- finalIndexOut.writeLong(finalOut.getPos()-segmentStart);
|
|
|
- }
|
|
|
- finalOut.close();
|
|
|
- finalIndexOut.close();
|
|
|
- //cleanup
|
|
|
- for(int i = 0; i < numSpills; i++) {
|
|
|
- in[i].close(); localFs.delete(filename[i]);
|
|
|
- indexIn[i].close(); localFs.delete(indexFileName[i]);
|
|
|
+
|
|
|
+ //create a sorter object as we need access to the SegmentDescriptor
|
|
|
+ //class and merge methods
|
|
|
+ Sorter sorter = new Sorter(localFs, keyClass, valClass, job);
|
|
|
+ sorter.setFactor(numSpills);
|
|
|
+
|
|
|
+ for (int parts = 0; parts < partitions; parts++){
|
|
|
+ List<SegmentDescriptor> segmentList = new ArrayList(numSpills);
|
|
|
+ for(int i = 0; i < numSpills; i++) {
|
|
|
+ long segmentOffset = indexIn[i].readLong();
|
|
|
+ long segmentLength = indexIn[i].readLong();
|
|
|
+ SegmentDescriptor s = sorter.new SegmentDescriptor(segmentOffset,
|
|
|
+ segmentLength, filename[i]);
|
|
|
+ s.preserveInput(true);
|
|
|
+ s.doSync();
|
|
|
+ segmentList.add(i, s);
|
|
|
+ }
|
|
|
+ segmentStart = finalOut.getPos();
|
|
|
+ SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut,
|
|
|
+ job.getMapOutputKeyClass(), job.getMapOutputValueClass(),
|
|
|
+ compressionType, codec);
|
|
|
+ sorter.writeFile(sorter.merge(segmentList), writer);
|
|
|
+ //add a sync block - required esp. for block compression to ensure
|
|
|
+ //partition data don't span partition boundaries
|
|
|
+ writer.sync();
|
|
|
+ //when we write the offset/length to the final index file, we write
|
|
|
+ //longs for both. This helps us to reliably seek directly to the
|
|
|
+ //offset/length for a partition when we start serving the byte-ranges
|
|
|
+ //to the reduces. We probably waste some space in the file by doing
|
|
|
+ //this as opposed to writing VLong but it helps us later on.
|
|
|
+ finalIndexOut.writeLong(segmentStart);
|
|
|
+ finalIndexOut.writeLong(finalOut.getPos()-segmentStart);
|
|
|
+ }
|
|
|
+ finalOut.close();
|
|
|
+ finalIndexOut.close();
|
|
|
+ //cleanup
|
|
|
+ for(int i = 0; i < numSpills; i++) {
|
|
|
+ in[i].close(); localFs.delete(filename[i]);
|
|
|
+ indexIn[i].close(); localFs.delete(indexFileName[i]);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ sortProgress.interrupt();
|
|
|
}
|
|
|
}
|
|
|
|