瀏覽代碼

HADOOP-2391. Cleanup job output directory before declaring a job as SUCCESSFUL. Contributed by Amareshwari Sri Ramadasu.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@627741 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 年之前
父節點
當前提交
8a95107fe4

+ 3 - 0
CHANGES.txt

@@ -76,6 +76,9 @@ Release 0.16.1 - Unrelease
     the destination after encountering an error. (Tsz Wo (Nicholas), SZE
     via cdouglas)
 
+    HADOOP-2391. Cleanup job output directory before declaring a job as
+    SUCCESSFUL. (Amareshwari Sri Ramadasu via ddas)
+
 Release 0.16.0 - 2008-02-07
 
   INCOMPATIBLE CHANGES

+ 22 - 0
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -30,6 +30,7 @@ import java.util.Vector;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobHistory.Values;
@@ -275,6 +276,18 @@ class JobInProgress {
                                       jobtracker, conf, this);
     }
 
+    // create job specific temporary directory in output path
+    Path outputPath = conf.getOutputPath();
+    if (outputPath != null) {
+      Path tmpDir = new Path(outputPath, "_temporary");
+      FileSystem fileSys = tmpDir.getFileSystem(conf);
+      if (!fileSys.mkdirs(tmpDir)) {
+        LOG.error("Mkdirs failed to create " + tmpDir.toString());
+      }
+    } else {
+      LOG.error("Null Output path");
+    }
+
     this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, JobStatus.RUNNING);
     tasksInited = true;
         
@@ -1129,6 +1142,15 @@ class JobInProgress {
       Path tempDir = new Path(conf.getSystemDir(), jobId); 
       fs.delete(tempDir); 
 
+      // delete the temporary directory in output directory
+      Path outputPath = conf.getOutputPath();
+      if (outputPath != null) {
+        Path tmpDir = new Path(outputPath, "_temporary");
+        FileSystem fileSys = tmpDir.getFileSystem(conf);
+        if (fileSys.exists(tmpDir)) {
+          FileUtil.fullyDelete(fileSys, tmpDir);
+        }
+      }
     } catch (IOException e) {
       LOG.warn("Error cleaning up "+profile.getJobId()+": "+e);
     }

+ 37 - 0
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -27,6 +27,7 @@ import java.util.Random;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -112,6 +113,13 @@ class LocalJobRunner implements JobSubmissionProtocol {
           numReduceTasks = 1;
           job.setNumReduceTasks(1);
         }
+        // create job specific temp directory in output path
+        Path tmpDir = new Path(job.getOutputPath(), "_temporary");
+        FileSystem fileSys = tmpDir.getFileSystem(job);
+        if (!fileSys.mkdirs(tmpDir)) {
+          LOG.error("Mkdirs failed to create " + tmpDir.toString());
+        }
+
         DataOutputBuffer buffer = new DataOutputBuffer();
         for (int i = 0; i < splits.length; i++) {
           String mapId = jobId + "_map_" + idFormat.format(i);
@@ -125,6 +133,16 @@ class LocalJobRunner implements JobSubmissionProtocol {
                                     splits[i].getClass().getName(),
                                     split);
           JobConf localConf = new JobConf(job);
+          if (fileSys.exists(tmpDir)) {
+            Path taskTmpDir = new Path(tmpDir, "_" + mapId);
+            if (!fileSys.mkdirs(taskTmpDir)) {
+              throw new IOException("Mkdirs failed to create " 
+                                     + taskTmpDir.toString());
+            }
+          } else {
+            throw new IOException("The directory " + tmpDir.toString()
+                                   + " doesnt exist " );
+          }
           map.localizeConfiguration(localConf);
           map.setConf(localConf);
           map_tasks += 1;
@@ -157,6 +175,16 @@ class LocalJobRunner implements JobSubmissionProtocol {
                                                  "tip_r_0001",
                                                  reduceId, 0, mapIds.size());
               JobConf localConf = new JobConf(job);
+              if (fileSys.exists(tmpDir)) {
+                Path taskTmpDir = new Path(tmpDir, "_" + reduceId);
+                if (!fileSys.mkdirs(taskTmpDir)) {
+                  throw new IOException("Mkdirs failed to create " 
+                                         + taskTmpDir.toString());
+                }
+              } else {
+                throw new IOException("The directory " + tmpDir.toString()
+                                       + " doesnt exist "); 
+              }
               reduce.localizeConfiguration(localConf);
               reduce.setConf(localConf);
               reduce_tasks += 1;
@@ -177,6 +205,15 @@ class LocalJobRunner implements JobSubmissionProtocol {
             this.mapoutputFile.removeAll(reduceId);
           }
         }
+        // delete the temporary directory in output directory
+        try {
+          if (fileSys.exists(tmpDir)) {
+            FileUtil.fullyDelete(fileSys, tmpDir);
+          }
+        } catch (IOException e) {
+          LOG.error("Exception in deleting " + tmpDir.toString());
+        }
+
         this.status.setRunState(JobStatus.SUCCEEDED);
 
         JobEndNotifier.localRunnerNotification(job, status);

+ 7 - 2
src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java

@@ -42,7 +42,12 @@ public class MapFileOutputFormat extends OutputFormatBase {
                                       String name, Progressable progress)
     throws IOException {
 
-    Path file = new Path(job.getOutputPath(), name);
+    Path outputPath = job.getOutputPath();
+    FileSystem fs = outputPath.getFileSystem(job);
+    if (!fs.exists(outputPath)) {
+      throw new IOException("Output directory doesnt exist");
+    }
+    Path file = new Path(outputPath, name);
     
     CompressionCodec codec = null;
     CompressionType compressionType = CompressionType.NONE;
@@ -58,7 +63,7 @@ public class MapFileOutputFormat extends OutputFormatBase {
     
     // ignore the progress parameter, since MapFile is local
     final MapFile.Writer out =
-      new MapFile.Writer(job, file.getFileSystem(job), file.toString(),
+      new MapFile.Writer(job, fs, file.toString(),
                          job.getOutputKeyClass(),
                          job.getOutputValueClass(),
                          compressionType, codec,

+ 6 - 2
src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java

@@ -40,8 +40,12 @@ public class SequenceFileOutputFormat extends OutputFormatBase {
                                       String name, Progressable progress)
     throws IOException {
 
-    Path file = new Path(job.getOutputPath(), name);
-    FileSystem fs = file.getFileSystem(job);
+    Path outputPath = job.getOutputPath();
+    FileSystem fs = outputPath.getFileSystem(job);
+    if (!fs.exists(outputPath)) {
+      throw new IOException("Output directory doesnt exist");
+    }
+    Path file = new Path(outputPath, name);
     CompressionCodec codec = null;
     CompressionType compressionType = CompressionType.NONE;
     if (getCompressOutput(job)) {

+ 3 - 2
src/java/org/apache/hadoop/mapred/Task.java

@@ -190,7 +190,8 @@ abstract class Task implements Writable, Configurable {
   public String toString() { return taskId; }
 
   private Path getTaskOutputPath(JobConf conf) {
-    Path p = new Path(conf.getOutputPath(), ("_" + taskId));
+    Path p = new Path(conf.getOutputPath(), ("_temporary" 
+                      + Path.SEPARATOR + "_" + taskId));
     try {
       FileSystem fs = p.getFileSystem(conf);
       return p.makeQualified(fs);
@@ -420,7 +421,7 @@ abstract class Task implements Writable, Configurable {
     if (taskOutputPath != null) {
       FileSystem fs = taskOutputPath.getFileSystem(conf);
       if (fs.exists(taskOutputPath)) {
-        Path jobOutputPath = taskOutputPath.getParent();
+        Path jobOutputPath = taskOutputPath.getParent().getParent();
 
         // Move the task outputs to their final place
         moveTaskOutputs(fs, jobOutputPath, taskOutputPath);

+ 18 - 0
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -1423,7 +1423,25 @@ public class TaskTracker
             
       localJobConf.set("mapred.task.id", task.getTaskId());
       keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+
+      // create _taskid directory in output path temporary directory.
+      Path outputPath = localJobConf.getOutputPath();
+      if (outputPath != null) {
+        Path jobTmpDir = new Path(outputPath, "_temporary");
+        FileSystem fs = jobTmpDir.getFileSystem(localJobConf);
+        if (fs.exists(jobTmpDir)) {
+          Path taskTmpDir = new Path(jobTmpDir, "_" + task.getTaskId());
+          if (!fs.mkdirs(taskTmpDir)) {
+            throw new IOException("Mkdirs failed to create " 
+                                 + taskTmpDir.toString());
+          }
+        } else {
+          throw new IOException("The directory " + jobTmpDir.toString()
+                                 + " doesnt exist "); 
+        }
+      }
       task.localizeConfiguration(localJobConf);
+      
       OutputStream out = localFs.create(localTaskFile);
       try {
         localJobConf.write(out);

+ 3 - 0
src/java/org/apache/hadoop/mapred/TextOutputFormat.java

@@ -108,6 +108,9 @@ public class TextOutputFormat<K extends WritableComparable,
 
     Path dir = job.getOutputPath();
     FileSystem fs = dir.getFileSystem(job);
+    if (!fs.exists(dir)) {
+      throw new IOException("Output directory doesnt exist");
+    }
     boolean isCompressed = getCompressOutput(job);
     if (!isCompressed) {
       FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);

+ 4 - 0
src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java

@@ -44,6 +44,10 @@ public class TestTextOutputFormat extends TestCase {
   public void testFormat() throws Exception {
     JobConf job = new JobConf();
     job.setOutputPath(workDir);
+    FileSystem fs = workDir.getFileSystem(job);
+    if (!fs.mkdirs(workDir)) {
+      fail("Failed to create output directory");
+    }
     String file = "test.txt";
     
     // A reporter that does nothing