Browse Source

Fix HADOOP-116. Clean up job submission files. On job completion, remove the directory containing the submitted job.xml file, since JobClient always creates a new directory to hold this file.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@392353 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 years ago
parent
commit
50db53a372

+ 13 - 27
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -48,7 +48,6 @@ class JobInProgress {
 
     long startTime;
     long finishTime;
-    String deleteUponCompletion = null;
 
     private JobConf conf;
     boolean tasksInited = false;
@@ -84,15 +83,6 @@ class JobInProgress {
 
         this.numMapTasks = conf.getNumMapTasks();
         this.numReduceTasks = conf.getNumReduceTasks();
-
-        //
-        // If a jobFile is in the systemDir, we can delete it (and
-        // its JAR) upon completion
-        //
-        File systemDir = conf.getSystemDir();
-        if (jobFile.startsWith(systemDir.getPath())) {
-            this.deleteUponCompletion = jobFile;
-        }
     }
 
     /**
@@ -423,6 +413,7 @@ class JobInProgress {
         if (status.getRunState() == JobStatus.RUNNING && allDone) {
             this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f, JobStatus.SUCCEEDED);
             this.finishTime = System.currentTimeMillis();
+            garbageCollect();
         }
     }
 
@@ -443,6 +434,8 @@ class JobInProgress {
             for (int i = 0; i < reduces.length; i++) {
                 reduces[i].kill();
             }
+
+            garbageCollect();
         }
     }
 
@@ -475,11 +468,8 @@ class JobInProgress {
      * from all tables.  Be sure to remove all of this job's tasks
      * from the various tables.
      */
-    public synchronized void garbageCollect() throws IOException {
-        //
-        // Remove this job from all tables
-        //
-
+    synchronized void garbageCollect() {
+      try {
         // Definitely remove the local-disk copy of the job file
         if (localJobFile != null) {
             localJobFile.delete();
@@ -490,17 +480,13 @@ class JobInProgress {
             localJarFile = null;
         }
 
-        //
-        // If the job file was in the temporary system directory,
-        // we should delete it upon garbage collect.
-        //
-        if (deleteUponCompletion != null) {
-            JobConf jd = new JobConf(deleteUponCompletion);
-            FileSystem fs = FileSystem.get(conf);
-            fs.delete(new File(jd.getJar()));
-            fs.delete(new File(deleteUponCompletion));
-            deleteUponCompletion = null;
-        }
+        // JobClient always creates a new directory with job files
+        // so we remove that directory to cleanup
+        FileSystem fs = FileSystem.get(conf);
+        fs.delete(new File(profile.getJobFile()).getParentFile());
+
+      } catch (IOException e) {
+        LOG.warning("Error cleaning up "+profile.getJobId()+": "+e);
+      }
     }
 }
-

+ 15 - 3
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -45,6 +45,8 @@ class LocalJobRunner implements JobSubmissionProtocol {
     private ArrayList mapIds = new ArrayList();
     private MapOutputFile mapoutputFile;
     private JobProfile profile;
+    private File localFile;
+    private FileSystem localFs;
 
     public Job(String file, Configuration conf) throws IOException {
       this.file = file;
@@ -52,7 +54,9 @@ class LocalJobRunner implements JobSubmissionProtocol {
       this.mapoutputFile = new MapOutputFile();
       this.mapoutputFile.setConf(conf);
 
-      File localFile = new JobConf(conf).getLocalFile("localRunner", id+".xml");
+      this.localFile = new JobConf(conf).getLocalFile("localRunner", id+".xml");
+      this.localFs = FileSystem.getNamed("local", conf);
+
       fs.copyToLocalFile(new File(file), localFile);
       this.job = new JobConf(localFile);
       profile = new JobProfile(job.getUser(), id, file, 
@@ -103,7 +107,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
           File mapOut = this.mapoutputFile.getOutputFile(mapId, 0);
           File reduceIn = this.mapoutputFile.getInputFile(mapId, reduceId);
           reduceIn.getParentFile().mkdirs();
-          if (!FileSystem.getNamed("local", this.job).rename(mapOut, reduceIn))
+          if (!localFs.rename(mapOut, reduceIn))
             throw new IOException("Couldn't rename " + mapOut);
           this.mapoutputFile.removeAll(mapId);
         }
@@ -126,7 +130,15 @@ class LocalJobRunner implements JobSubmissionProtocol {
 
       } catch (Throwable t) {
         this.status.runState = JobStatus.FAILED;
-        t.printStackTrace();
+        LOG.log(Level.WARNING, id, t);
+
+      } finally {
+        try {
+          fs.delete(new File(file).getParentFile()); // delete submit dir
+          localFs.delete(localFile);              // delete local copy
+        } catch (IOException e) {
+          LOG.warning("Error cleaning up "+id+": "+e);
+        }
       }
     }