|
@@ -30,6 +30,7 @@ import java.util.concurrent.Executors;
|
|
|
import java.util.concurrent.Future;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.fs.FSError;
|
|
@@ -437,43 +438,6 @@ public class LocalContainerLauncher extends AbstractService implements
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Within the _local_ filesystem (not HDFS), all activity takes place within
|
|
|
- * a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/),
|
|
|
- * and all sub-MapTasks create the same filename ("file.out"). Rename that
|
|
|
- * to something unique (e.g., "map_0.out") to avoid collisions.
|
|
|
- *
|
|
|
- * Longer-term, we'll modify [something] to use TaskAttemptID-based
|
|
|
- * filenames instead of "file.out". (All of this is entirely internal,
|
|
|
- * so there are no particular compatibility issues.)
|
|
|
- */
|
|
|
- private MapOutputFile renameMapOutputForReduce(JobConf conf,
|
|
|
- TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
|
|
|
- FileSystem localFs = FileSystem.getLocal(conf);
|
|
|
- // move map output to reduce input
|
|
|
- Path mapOut = subMapOutputFile.getOutputFile();
|
|
|
- FileStatus mStatus = localFs.getFileStatus(mapOut);
|
|
|
- Path reduceIn = subMapOutputFile.getInputFileForWrite(
|
|
|
- TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
|
|
|
- Path mapOutIndex = new Path(mapOut.toString() + ".index");
|
|
|
- Path reduceInIndex = new Path(reduceIn.toString() + ".index");
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Renaming map output file for task attempt "
|
|
|
- + mapId.toString() + " from original location " + mapOut.toString()
|
|
|
- + " to destination " + reduceIn.toString());
|
|
|
- }
|
|
|
- if (!localFs.mkdirs(reduceIn.getParent())) {
|
|
|
- throw new IOException("Mkdirs failed to create "
|
|
|
- + reduceIn.getParent().toString());
|
|
|
- }
|
|
|
- if (!localFs.rename(mapOut, reduceIn))
|
|
|
- throw new IOException("Couldn't rename " + mapOut);
|
|
|
- if (!localFs.rename(mapOutIndex, reduceInIndex))
|
|
|
- throw new IOException("Couldn't rename " + mapOutIndex);
|
|
|
-
|
|
|
- return new RenamedMapOutputFile(reduceIn);
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Also within the local filesystem, we need to restore the initial state
|
|
|
* of the directory as much as possible. Compare current contents against
|
|
@@ -506,7 +470,46 @@ public class LocalContainerLauncher extends AbstractService implements
|
|
|
}
|
|
|
|
|
|
} // end EventHandler
|
|
|
-
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Within the _local_ filesystem (not HDFS), all activity takes place within
|
|
|
+ * a subdir inside one of the LOCAL_DIRS
|
|
|
+ * (${local.dir}/usercache/$user/appcache/$appId/$contId/),
|
|
|
+ * and all sub-MapTasks create the same filename ("file.out"). Rename that
|
|
|
+ * to something unique (e.g., "map_0.out") to avoid possible collisions.
|
|
|
+ *
|
|
|
+ * Longer-term, we'll modify [something] to use TaskAttemptID-based
|
|
|
+ * filenames instead of "file.out". (All of this is entirely internal,
|
|
|
+ * so there are no particular compatibility issues.)
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ protected static MapOutputFile renameMapOutputForReduce(JobConf conf,
|
|
|
+ TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
|
|
|
+ FileSystem localFs = FileSystem.getLocal(conf);
|
|
|
+ // move map output to reduce input
|
|
|
+ Path mapOut = subMapOutputFile.getOutputFile();
|
|
|
+ FileStatus mStatus = localFs.getFileStatus(mapOut);
|
|
|
+ Path reduceIn = subMapOutputFile.getInputFileForWrite(
|
|
|
+ TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
|
|
|
+ Path mapOutIndex = subMapOutputFile.getOutputIndexFile();
|
|
|
+ Path reduceInIndex = new Path(reduceIn.toString() + ".index");
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Renaming map output file for task attempt "
|
|
|
+ + mapId.toString() + " from original location " + mapOut.toString()
|
|
|
+ + " to destination " + reduceIn.toString());
|
|
|
+ }
|
|
|
+ if (!localFs.mkdirs(reduceIn.getParent())) {
|
|
|
+ throw new IOException("Mkdirs failed to create "
|
|
|
+ + reduceIn.getParent().toString());
|
|
|
+ }
|
|
|
+ if (!localFs.rename(mapOut, reduceIn))
|
|
|
+ throw new IOException("Couldn't rename " + mapOut);
|
|
|
+ if (!localFs.rename(mapOutIndex, reduceInIndex))
|
|
|
+ throw new IOException("Couldn't rename " + mapOutIndex);
|
|
|
+
|
|
|
+ return new RenamedMapOutputFile(reduceIn);
|
|
|
+ }
|
|
|
+
|
|
|
private static class RenamedMapOutputFile extends MapOutputFile {
|
|
|
private Path path;
|
|
|
|