|
@@ -54,6 +54,7 @@ import org.apache.hadoop.security.Credentials;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.yarn.api.ApplicationConstants;
|
|
|
import org.apache.log4j.LogManager;
|
|
@@ -236,11 +237,33 @@ class YarnChild {
|
|
|
job.setStrings(MRConfig.LOCAL_DIR, localSysDirs);
|
|
|
LOG.info(MRConfig.LOCAL_DIR + " for child: " + job.get(MRConfig.LOCAL_DIR));
|
|
|
LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
|
|
|
- Path workDir = lDirAlloc.getLocalPathForWrite("work", job);
|
|
|
- FileSystem lfs = FileSystem.getLocal(job).getRaw();
|
|
|
- if (!lfs.mkdirs(workDir)) {
|
|
|
- throw new IOException("Mkdirs failed to create "
|
|
|
- + workDir.toString());
|
|
|
+ Path workDir = null;
|
|
|
+ // First, try to find the JOB_LOCAL_DIR on this host.
|
|
|
+ try {
|
|
|
+ workDir = lDirAlloc.getLocalPathToRead("work", job);
|
|
|
+ } catch (DiskErrorException e) {
|
|
|
+ // DiskErrorException means dir not found. If not found, it will
|
|
|
+ // be created below.
|
|
|
+ }
|
|
|
+ if (workDir == null) {
|
|
|
+ // JOB_LOCAL_DIR doesn't exist on this host -- Create it.
|
|
|
+ workDir = lDirAlloc.getLocalPathForWrite("work", job);
|
|
|
+ FileSystem lfs = FileSystem.getLocal(job).getRaw();
|
|
|
+ boolean madeDir = false;
|
|
|
+ try {
|
|
|
+ madeDir = lfs.mkdirs(workDir);
|
|
|
+ } catch (FileAlreadyExistsException e) {
|
|
|
+ // Since all tasks will be running in their own JVM, the race condition
|
|
|
+ // exists where multiple tasks could be trying to create this directory
|
|
|
+ // at the same time. If this task loses the race, it's okay because
|
|
|
+ // the directory already exists.
|
|
|
+ madeDir = true;
|
|
|
+ workDir = lDirAlloc.getLocalPathToRead("work", job);
|
|
|
+ }
|
|
|
+ if (!madeDir) {
|
|
|
+ throw new IOException("Mkdirs failed to create "
|
|
|
+ + workDir.toString());
|
|
|
+ }
|
|
|
}
|
|
|
job.set(MRJobConfig.JOB_LOCAL_DIR,workDir.toString());
|
|
|
}
|