|
@@ -61,6 +61,7 @@ import org.apache.hadoop.metrics.MetricsUtil;
|
|
import org.apache.hadoop.util.Progress;
|
|
import org.apache.hadoop.util.Progress;
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
|
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
|
|
|
|
|
import static org.apache.hadoop.mapred.Task.Counter.*;
|
|
import static org.apache.hadoop.mapred.Task.Counter.*;
|
|
|
|
|
|
@@ -265,7 +266,14 @@ class ReduceTask extends Task {
|
|
// the list of files to merge, otherwise not.
|
|
// the list of files to merge, otherwise not.
|
|
List<Path> mapFilesList = new ArrayList<Path>();
|
|
List<Path> mapFilesList = new ArrayList<Path>();
|
|
for(int i=0; i < numMaps; i++) {
|
|
for(int i=0; i < numMaps; i++) {
|
|
- Path f = mapOutputFile.getInputFile(i, getTaskId());
|
|
|
|
|
|
+ Path f;
|
|
|
|
+ try {
|
|
|
|
+ //catch and ignore DiskErrorException, since some map outputs will
|
|
|
|
+ //really be absent (inmem merge).
|
|
|
|
+ f = mapOutputFile.getInputFile(i, getTaskId());
|
|
|
|
+ } catch (DiskErrorException d) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
if (lfs.exists(f))
|
|
if (lfs.exists(f))
|
|
mapFilesList.add(f);
|
|
mapFilesList.add(f);
|
|
}
|
|
}
|
|
@@ -292,7 +300,7 @@ class ReduceTask extends Task {
|
|
};
|
|
};
|
|
sortProgress.setName("Sort progress reporter for task "+getTaskId());
|
|
sortProgress.setName("Sort progress reporter for task "+getTaskId());
|
|
|
|
|
|
- Path tempDir = job.getLocalPath(getTaskId());
|
|
|
|
|
|
+ Path tempDir = new Path(getTaskId());
|
|
|
|
|
|
WritableComparator comparator = job.getOutputValueGroupingComparator();
|
|
WritableComparator comparator = job.getOutputValueGroupingComparator();
|
|
|
|
|
|
@@ -496,6 +504,11 @@ class ReduceTask extends Task {
|
|
|
|
|
|
private Random random = null;
|
|
private Random random = null;
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * the max size of the merge output from ramfs
|
|
|
|
+ */
|
|
|
|
+ private long ramfsMergeOutputSize;
|
|
|
|
+
|
|
/** Represents the result of an attempt to copy a map output */
|
|
/** Represents the result of an attempt to copy a map output */
|
|
private class CopyResult {
|
|
private class CopyResult {
|
|
|
|
|
|
@@ -523,6 +536,15 @@ class ReduceTask extends Task {
|
|
public MapOutputLocation getLocation() { return loc; }
|
|
public MapOutputLocation getLocation() { return loc; }
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private int extractMapIdFromPathName(Path pathname) {
|
|
|
|
+ //all paths end with map_<id>.out
|
|
|
|
+ String firstPathName = pathname.getName();
|
|
|
|
+ int beginIndex = firstPathName.lastIndexOf("map_");
|
|
|
|
+ int endIndex = firstPathName.lastIndexOf(".out");
|
|
|
|
+ return Integer.parseInt(firstPathName.substring(beginIndex +
|
|
|
|
+ "map_".length(), endIndex));
|
|
|
|
+ }
|
|
|
|
+
|
|
private Thread createProgressThread(final TaskUmbilicalProtocol umbilical) {
|
|
private Thread createProgressThread(final TaskUmbilicalProtocol umbilical) {
|
|
//spawn a thread to give copy progress heartbeats
|
|
//spawn a thread to give copy progress heartbeats
|
|
Thread copyProgress = new Thread() {
|
|
Thread copyProgress = new Thread() {
|
|
@@ -645,14 +667,17 @@ class ReduceTask extends Task {
|
|
String reduceId = reduceTask.getTaskId();
|
|
String reduceId = reduceTask.getTaskId();
|
|
LOG.info(reduceId + " Copying " + loc.getMapTaskId() +
|
|
LOG.info(reduceId + " Copying " + loc.getMapTaskId() +
|
|
" output from " + loc.getHost() + ".");
|
|
" output from " + loc.getHost() + ".");
|
|
- // the place where the file should end up
|
|
|
|
- Path finalFilename = conf.getLocalPath(reduceId + "/map_" +
|
|
|
|
- loc.getMapId() + ".out");
|
|
|
|
|
|
+ // a temp filename. If this file gets created in ramfs, we're fine,
|
|
|
|
+ // else, we will check the localFS to find a suitable final location
|
|
|
|
+ // for this path
|
|
|
|
+ Path filename = new Path("/" + reduceId + "/map_" +
|
|
|
|
+ loc.getMapId() + ".out");
|
|
// a working filename that will be unique to this attempt
|
|
// a working filename that will be unique to this attempt
|
|
- Path tmpFilename = new Path(finalFilename + "-" + id);
|
|
|
|
|
|
+ Path tmpFilename = new Path(filename + "-" + id);
|
|
// this copies the map output file
|
|
// this copies the map output file
|
|
tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleMetrics,
|
|
tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleMetrics,
|
|
- tmpFilename, reduceTask.getPartition(),
|
|
|
|
|
|
+ tmpFilename, lDirAlloc,
|
|
|
|
+ conf, reduceTask.getPartition(),
|
|
STALLED_COPY_TIMEOUT);
|
|
STALLED_COPY_TIMEOUT);
|
|
if (!neededOutputs.contains(loc.getMapId())) {
|
|
if (!neededOutputs.contains(loc.getMapId())) {
|
|
if (tmpFilename != null) {
|
|
if (tmpFilename != null) {
|
|
@@ -662,7 +687,7 @@ class ReduceTask extends Task {
|
|
return CopyResult.OBSOLETE;
|
|
return CopyResult.OBSOLETE;
|
|
}
|
|
}
|
|
if (tmpFilename == null)
|
|
if (tmpFilename == null)
|
|
- throw new IOException("File " + finalFilename + "-" + id +
|
|
|
|
|
|
+ throw new IOException("File " + filename + "-" + id +
|
|
" not created");
|
|
" not created");
|
|
long bytes = -1;
|
|
long bytes = -1;
|
|
// lock the ReduceTask while we do the rename
|
|
// lock the ReduceTask while we do the rename
|
|
@@ -676,9 +701,12 @@ class ReduceTask extends Task {
|
|
}
|
|
}
|
|
|
|
|
|
bytes = fs.getLength(tmpFilename);
|
|
bytes = fs.getLength(tmpFilename);
|
|
|
|
+ //resolve the final filename against the directory where the tmpFile
|
|
|
|
+ //got created
|
|
|
|
+ filename = new Path(tmpFilename.getParent(), filename.getName());
|
|
// if we can't rename the file, something is broken (and IOException
|
|
// if we can't rename the file, something is broken (and IOException
|
|
// will be thrown).
|
|
// will be thrown).
|
|
- if (!fs.rename(tmpFilename, finalFilename)) {
|
|
|
|
|
|
+ if (!fs.rename(tmpFilename, filename)) {
|
|
fs.delete(tmpFilename);
|
|
fs.delete(tmpFilename);
|
|
bytes = -1;
|
|
bytes = -1;
|
|
throw new IOException("failure to rename map output " +
|
|
throw new IOException("failure to rename map output " +
|
|
@@ -766,6 +794,8 @@ class ReduceTask extends Task {
|
|
inMemFileSys = (InMemoryFileSystem)FileSystem.get(uri, conf);
|
|
inMemFileSys = (InMemoryFileSystem)FileSystem.get(uri, conf);
|
|
LOG.info(reduceTask.getTaskId() + " Created an InMemoryFileSystem, uri: "
|
|
LOG.info(reduceTask.getTaskId() + " Created an InMemoryFileSystem, uri: "
|
|
+ uri);
|
|
+ uri);
|
|
|
|
+ ramfsMergeOutputSize = (long)(MAX_INMEM_FILESYS_USE *
|
|
|
|
+ inMemFileSys.getFSSize());
|
|
localFileSys = FileSystem.getLocal(conf);
|
|
localFileSys = FileSystem.getLocal(conf);
|
|
//create an instance of the sorter
|
|
//create an instance of the sorter
|
|
sorter =
|
|
sorter =
|
|
@@ -1025,9 +1055,12 @@ class ReduceTask extends Task {
|
|
//it is not guaranteed that this file will be present after merge
|
|
//it is not guaranteed that this file will be present after merge
|
|
//is called (we delete empty sequence files as soon as we see them
|
|
//is called (we delete empty sequence files as soon as we see them
|
|
//in the merge method)
|
|
//in the merge method)
|
|
|
|
+ int mapId = extractMapIdFromPathName(inMemClosedFiles[0]);
|
|
|
|
+ Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
|
|
|
|
+ reduceTask.getTaskId(), ramfsMergeOutputSize);
|
|
SequenceFile.Writer writer = sorter.cloneFileAttributes(
|
|
SequenceFile.Writer writer = sorter.cloneFileAttributes(
|
|
inMemFileSys.makeQualified(inMemClosedFiles[0]),
|
|
inMemFileSys.makeQualified(inMemClosedFiles[0]),
|
|
- localFileSys.makeQualified(inMemClosedFiles[0]), null);
|
|
|
|
|
|
+ localFileSys.makeQualified(outputPath), null);
|
|
|
|
|
|
SequenceFile.Sorter.RawKeyValueIterator rIter = null;
|
|
SequenceFile.Sorter.RawKeyValueIterator rIter = null;
|
|
try {
|
|
try {
|
|
@@ -1046,7 +1079,7 @@ class ReduceTask extends Task {
|
|
LOG.info(reduceTask.getTaskId() +
|
|
LOG.info(reduceTask.getTaskId() +
|
|
" Merge of the " +inMemClosedFiles.length +
|
|
" Merge of the " +inMemClosedFiles.length +
|
|
" files in InMemoryFileSystem complete." +
|
|
" files in InMemoryFileSystem complete." +
|
|
- " Local file is " + inMemClosedFiles[0]);
|
|
|
|
|
|
+ " Local file is " + outputPath);
|
|
} catch (Throwable t) {
|
|
} catch (Throwable t) {
|
|
LOG.warn(reduceTask.getTaskId() +
|
|
LOG.warn(reduceTask.getTaskId() +
|
|
" Final merge of the inmemory files threw an exception: " +
|
|
" Final merge of the inmemory files threw an exception: " +
|
|
@@ -1151,9 +1184,16 @@ class ReduceTask extends Task {
|
|
//it is not guaranteed that this file will be present after merge
|
|
//it is not guaranteed that this file will be present after merge
|
|
//is called (we delete empty sequence files as soon as we see them
|
|
//is called (we delete empty sequence files as soon as we see them
|
|
//in the merge method)
|
|
//in the merge method)
|
|
|
|
+
|
|
|
|
+ //figure out the mapId
|
|
|
|
+ int mapId = extractMapIdFromPathName(inMemClosedFiles[0]);
|
|
|
|
+
|
|
|
|
+ Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
|
|
|
|
+ reduceTask.getTaskId(), ramfsMergeOutputSize);
|
|
|
|
+
|
|
SequenceFile.Writer writer = sorter.cloneFileAttributes(
|
|
SequenceFile.Writer writer = sorter.cloneFileAttributes(
|
|
inMemFileSys.makeQualified(inMemClosedFiles[0]),
|
|
inMemFileSys.makeQualified(inMemClosedFiles[0]),
|
|
- localFileSys.makeQualified(inMemClosedFiles[0]), null);
|
|
|
|
|
|
+ localFileSys.makeQualified(outputPath), null);
|
|
SequenceFile.Sorter.RawKeyValueIterator rIter;
|
|
SequenceFile.Sorter.RawKeyValueIterator rIter;
|
|
try {
|
|
try {
|
|
rIter = sorter.merge(inMemClosedFiles, true,
|
|
rIter = sorter.merge(inMemClosedFiles, true,
|
|
@@ -1162,7 +1202,7 @@ class ReduceTask extends Task {
|
|
//make sure that we delete the ondisk file that we created
|
|
//make sure that we delete the ondisk file that we created
|
|
//earlier when we invoked cloneFileAttributes
|
|
//earlier when we invoked cloneFileAttributes
|
|
writer.close();
|
|
writer.close();
|
|
- localFileSys.delete(inMemClosedFiles[0]);
|
|
|
|
|
|
+ localFileSys.delete(outputPath);
|
|
throw new IOException (StringUtils.stringifyException(e));
|
|
throw new IOException (StringUtils.stringifyException(e));
|
|
}
|
|
}
|
|
sorter.writeFile(rIter, writer);
|
|
sorter.writeFile(rIter, writer);
|
|
@@ -1170,7 +1210,7 @@ class ReduceTask extends Task {
|
|
LOG.info(reduceTask.getTaskId() +
|
|
LOG.info(reduceTask.getTaskId() +
|
|
" Merge of the " +inMemClosedFiles.length +
|
|
" Merge of the " +inMemClosedFiles.length +
|
|
" files in InMemoryFileSystem complete." +
|
|
" files in InMemoryFileSystem complete." +
|
|
- " Local file is " + inMemClosedFiles[0]);
|
|
|
|
|
|
+ " Local file is " + outputPath);
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
LOG.info(reduceTask.getTaskId() + " Nothing to merge from " +
|
|
LOG.info(reduceTask.getTaskId() + " Nothing to merge from " +
|