|
@@ -114,10 +114,15 @@ class LocalJobRunner implements JobSubmissionProtocol {
|
|
|
job.setNumReduceTasks(1);
|
|
|
}
|
|
|
// create job specific temp directory in output path
|
|
|
- Path tmpDir = new Path(job.getOutputPath(), "_temporary");
|
|
|
- FileSystem fileSys = tmpDir.getFileSystem(job);
|
|
|
- if (!fileSys.mkdirs(tmpDir)) {
|
|
|
- LOG.error("Mkdirs failed to create " + tmpDir.toString());
|
|
|
+ Path outputPath = job.getOutputPath();
|
|
|
+ FileSystem outputFs = null;
|
|
|
+ Path tmpDir = null;
|
|
|
+ if (outputPath != null) {
|
|
|
+ tmpDir = new Path(outputPath, MRConstants.TEMP_DIR_NAME);
|
|
|
+ outputFs = tmpDir.getFileSystem(job);
|
|
|
+ if (!outputFs.mkdirs(tmpDir)) {
|
|
|
+ LOG.error("Mkdirs failed to create " + tmpDir.toString());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
DataOutputBuffer buffer = new DataOutputBuffer();
|
|
@@ -133,15 +138,17 @@ class LocalJobRunner implements JobSubmissionProtocol {
|
|
|
splits[i].getClass().getName(),
|
|
|
split);
|
|
|
JobConf localConf = new JobConf(job);
|
|
|
- if (fileSys.exists(tmpDir)) {
|
|
|
- Path taskTmpDir = new Path(tmpDir, "_" + mapId);
|
|
|
- if (!fileSys.mkdirs(taskTmpDir)) {
|
|
|
- throw new IOException("Mkdirs failed to create "
|
|
|
- + taskTmpDir.toString());
|
|
|
- }
|
|
|
- } else {
|
|
|
- throw new IOException("The directory " + tmpDir.toString()
|
|
|
+ if (outputFs != null) {
|
|
|
+ if (outputFs.exists(tmpDir)) {
|
|
|
+ Path taskTmpDir = new Path(tmpDir, "_" + mapId);
|
|
|
+ if (!outputFs.mkdirs(taskTmpDir)) {
|
|
|
+ throw new IOException("Mkdirs failed to create "
|
|
|
+ + taskTmpDir.toString());
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ throw new IOException("The directory " + tmpDir.toString()
|
|
|
+ " doesnt exist " );
|
|
|
+ }
|
|
|
}
|
|
|
map.localizeConfiguration(localConf);
|
|
|
map.setConf(localConf);
|
|
@@ -175,15 +182,17 @@ class LocalJobRunner implements JobSubmissionProtocol {
|
|
|
"tip_r_0001",
|
|
|
reduceId, 0, mapIds.size());
|
|
|
JobConf localConf = new JobConf(job);
|
|
|
- if (fileSys.exists(tmpDir)) {
|
|
|
- Path taskTmpDir = new Path(tmpDir, "_" + reduceId);
|
|
|
- if (!fileSys.mkdirs(taskTmpDir)) {
|
|
|
- throw new IOException("Mkdirs failed to create "
|
|
|
- + taskTmpDir.toString());
|
|
|
+ if (outputFs != null) {
|
|
|
+ if (outputFs.exists(tmpDir)) {
|
|
|
+ Path taskTmpDir = new Path(tmpDir, "_" + reduceId);
|
|
|
+ if (!outputFs.mkdirs(taskTmpDir)) {
|
|
|
+ throw new IOException("Mkdirs failed to create "
|
|
|
+ + taskTmpDir.toString());
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ throw new IOException("The directory " + tmpDir.toString()
|
|
|
+ + " doesnt exist ");
|
|
|
}
|
|
|
- } else {
|
|
|
- throw new IOException("The directory " + tmpDir.toString()
|
|
|
- + " doesnt exist ");
|
|
|
}
|
|
|
reduce.localizeConfiguration(localConf);
|
|
|
reduce.setConf(localConf);
|
|
@@ -207,8 +216,10 @@ class LocalJobRunner implements JobSubmissionProtocol {
|
|
|
}
|
|
|
// delete the temporary directory in output directory
|
|
|
try {
|
|
|
- if (fileSys.exists(tmpDir)) {
|
|
|
- FileUtil.fullyDelete(fileSys, tmpDir);
|
|
|
+ if (outputFs != null) {
|
|
|
+ if (outputFs.exists(tmpDir)) {
|
|
|
+ FileUtil.fullyDelete(outputFs, tmpDir);
|
|
|
+ }
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
LOG.error("Exception in deleting " + tmpDir.toString());
|