Browse Source

HADOOP-5285. Fixes the issues - (1) obtainTaskCleanupTask checks whether job is inited before trying to lock the JobInProgress (2) Moves the CleanupQueue class outside the TaskTracker and makes it a generic class that is used by the JobTracker also for deleting the paths on the job's output fs. (3) Moves the references to completedJobStore outside the block where the JobTracker is locked. Contributed by Devaraj Das.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@746902 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 years ago
parent
commit
c8ec07814d

+ 7 - 0
CHANGES.txt

@@ -834,6 +834,13 @@ Release 0.20.0 - Unreleased
     HADOOP-5282. Fixed job history logs for task attempts that are failed by the
     HADOOP-5282. Fixed job history logs for task attempts that are failed by the
     JobTracker, say due to lost task trackers. (Amar Kamat via yhemanth)
     JobTracker, say due to lost task trackers. (Amar Kamat via yhemanth)
 
 
+    HADOOP-5285. Fixes the issues - (1) obtainTaskCleanupTask checks whether job is
+    inited before trying to lock the JobInProgress (2) Moves the CleanupQueue class
+    outside the TaskTracker and makes it a generic class that is used by the 
+    JobTracker also for deleting the paths on the job's output fs. (3) Moves the
+    references to completedJobStore outside the block where the JobTracker is locked.
+    (ddas)
+
 Release 0.19.1 - Unreleased
 Release 0.19.1 - Unreleased
 
 
   IMPROVEMENTS
   IMPROVEMENTS

+ 29 - 27
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -978,35 +978,39 @@ class JobInProgress {
   /*
   /*
    * Return task cleanup attempt if any, to run on a given tracker
    * Return task cleanup attempt if any, to run on a given tracker
    */
    */
-  public synchronized Task obtainTaskCleanupTask(TaskTrackerStatus tts, 
+  public Task obtainTaskCleanupTask(TaskTrackerStatus tts, 
                                                  boolean isMapSlot)
                                                  boolean isMapSlot)
   throws IOException {
   throws IOException {
-    if (this.status.getRunState() != JobStatus.RUNNING || 
-        jobFailed || jobKilled) {
-      return null;
-    }
-    
-    String taskTracker = tts.getTrackerName();
-    if (!shouldRunOnTaskTracker(taskTracker)) {
+    if (!tasksInited.get()) {
       return null;
       return null;
     }
     }
-    TaskAttemptID taskid = null;
-    TaskInProgress tip = null;
-    if (isMapSlot) {
-      if (!mapCleanupTasks.isEmpty()) {
-        taskid = mapCleanupTasks.remove(0);
-        tip = maps[taskid.getTaskID().getId()];
+    synchronized (this) {
+      if (this.status.getRunState() != JobStatus.RUNNING || 
+          jobFailed || jobKilled) {
+        return null;
       }
       }
-    } else {
-      if (!reduceCleanupTasks.isEmpty()) {
-        taskid = reduceCleanupTasks.remove(0);
-        tip = reduces[taskid.getTaskID().getId()];
+      String taskTracker = tts.getTrackerName();
+      if (!shouldRunOnTaskTracker(taskTracker)) {
+        return null;
       }
       }
+      TaskAttemptID taskid = null;
+      TaskInProgress tip = null;
+      if (isMapSlot) {
+        if (!mapCleanupTasks.isEmpty()) {
+          taskid = mapCleanupTasks.remove(0);
+          tip = maps[taskid.getTaskID().getId()];
+        }
+      } else {
+        if (!reduceCleanupTasks.isEmpty()) {
+          taskid = reduceCleanupTasks.remove(0);
+          tip = reduces[taskid.getTaskID().getId()];
+        }
+      }
+      if (tip != null) {
+        return tip.addRunningTask(taskid, taskTracker, true);
+      }
+      return null;
     }
     }
-    if (tip != null) {
-      return tip.addRunningTask(taskid, taskTracker, true);
-    }
-    return null;
   }
   }
   
   
   public synchronized Task obtainNewLocalMapTask(TaskTrackerStatus tts,
   public synchronized Task obtainNewLocalMapTask(TaskTrackerStatus tts,
@@ -1111,9 +1115,6 @@ class JobInProgress {
    * @return true/false
    * @return true/false
    */
    */
   private synchronized boolean canLaunchJobCleanupTask() {
   private synchronized boolean canLaunchJobCleanupTask() {
-    if (!tasksInited.get()) {
-      return false;
-    }
     // check if the job is running
     // check if the job is running
     if (status.getRunState() != JobStatus.RUNNING &&
     if (status.getRunState() != JobStatus.RUNNING &&
         status.getRunState() != JobStatus.PREP) {
         status.getRunState() != JobStatus.PREP) {
@@ -2444,6 +2445,7 @@ class JobInProgress {
    */
    */
   synchronized void garbageCollect() {
   synchronized void garbageCollect() {
     // Let the JobTracker know that a job is complete
     // Let the JobTracker know that a job is complete
+    jobtracker.storeCompletedJob(this);
     jobtracker.finalizeJob(this);
     jobtracker.finalizeJob(this);
       
       
     try {
     try {
@@ -2467,8 +2469,7 @@ class JobInProgress {
       // Delete temp dfs dirs created if any, like in case of 
       // Delete temp dfs dirs created if any, like in case of 
       // speculative exn of reduces.  
       // speculative exn of reduces.  
       Path tempDir = new Path(jobtracker.getSystemDir(), jobId.toString());
       Path tempDir = new Path(jobtracker.getSystemDir(), jobId.toString());
-      FileSystem fs = tempDir.getFileSystem(conf);
-      fs.delete(tempDir, true); 
+      new CleanupQueue().addToQueue(conf,tempDir); 
     } catch (IOException e) {
     } catch (IOException e) {
       LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
       LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
     }
     }
@@ -2479,6 +2480,7 @@ class JobInProgress {
     this.runningMapCache = null;
     this.runningMapCache = null;
     this.nonRunningReduces = null;
     this.nonRunningReduces = null;
     this.runningReduces = null;
     this.runningReduces = null;
+
   }
   }
 
 
   /**
   /**

+ 35 - 32
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -1823,9 +1823,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   synchronized void finalizeJob(JobInProgress job) {
   synchronized void finalizeJob(JobInProgress job) {
     // Mark the 'non-running' tasks for pruning
     // Mark the 'non-running' tasks for pruning
     markCompletedJob(job);
     markCompletedJob(job);
-
-    //persists the job info in DFS
-    completedJobStatusStore.store(job);
     
     
     JobEndNotifier.registerNotification(job.getJobConf(), job.getStatus());
     JobEndNotifier.registerNotification(job.getJobConf(), job.getStatus());
 
 
@@ -2856,34 +2853,41 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     setJobPriority(jobid, newPriority);
     setJobPriority(jobid, newPriority);
   }
   }
                            
                            
-  public synchronized JobProfile getJobProfile(JobID jobid) {
-    JobInProgress job = jobs.get(jobid);
-    if (job != null) {
-      return job.getProfile();
-    } else {
-      return completedJobStatusStore.readJobProfile(jobid);
+  void storeCompletedJob(JobInProgress job) {
+    //persists the job info in DFS
+    completedJobStatusStore.store(job);
+  }
+
+  public JobProfile getJobProfile(JobID jobid) {
+    synchronized (this) {
+      JobInProgress job = jobs.get(jobid);
+      if (job != null) {
+        return job.getProfile();
+      } 
     }
     }
+    return completedJobStatusStore.readJobProfile(jobid);
   }
   }
-  public synchronized JobStatus getJobStatus(JobID jobid) {
+  public JobStatus getJobStatus(JobID jobid) {
     if (null == jobid) {
     if (null == jobid) {
       LOG.warn("JobTracker.getJobStatus() cannot get status for null jobid");
       LOG.warn("JobTracker.getJobStatus() cannot get status for null jobid");
       return null;
       return null;
     }
     }
-    
-    JobInProgress job = jobs.get(jobid);
-    if (job != null) {
-      return job.getStatus();
-    } else {
-      return completedJobStatusStore.readJobStatus(jobid);
+    synchronized (this) {
+      JobInProgress job = jobs.get(jobid);
+      if (job != null) {
+        return job.getStatus();
+      } 
     }
     }
+    return completedJobStatusStore.readJobStatus(jobid);
   }
   }
-  public synchronized Counters getJobCounters(JobID jobid) {
-    JobInProgress job = jobs.get(jobid);
-    if (job != null) {
-      return job.getCounters();
-    } else {
-      return completedJobStatusStore.readCounters(jobid);
+  public Counters getJobCounters(JobID jobid) {
+    synchronized (this) {
+      JobInProgress job = jobs.get(jobid);
+      if (job != null) {
+        return job.getCounters();
+      } 
     }
     }
+    return completedJobStatusStore.readCounters(jobid);
   }
   }
   public synchronized TaskReport[] getMapTaskReports(JobID jobid) {
   public synchronized TaskReport[] getMapTaskReports(JobID jobid) {
     JobInProgress job = jobs.get(jobid);
     JobInProgress job = jobs.get(jobid);
@@ -2981,18 +2985,17 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
    */
    */
   public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
   public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
       JobID jobid, int fromEventId, int maxEvents) throws IOException{
       JobID jobid, int fromEventId, int maxEvents) throws IOException{
-    TaskCompletionEvent[] events = EMPTY_EVENTS;
-
-    JobInProgress job = this.jobs.get(jobid);
-    if (null != job) {
-      if (job.inited()) {
-        events = job.getTaskCompletionEvents(fromEventId, maxEvents);
+    synchronized (this) {
+      JobInProgress job = this.jobs.get(jobid);
+      if (null != job) {
+        if (job.inited()) {
+          return job.getTaskCompletionEvents(fromEventId, maxEvents);
+        } else {
+          return EMPTY_EVENTS;
+        }
       }
       }
     }
     }
-    else {
-      events = completedJobStatusStore.readJobTaskCompletionEvents(jobid, fromEventId, maxEvents);
-    }
-    return events;
+    return completedJobStatusStore.readJobTaskCompletionEvents(jobid, fromEventId, maxEvents);
   }
   }
 
 
   /**
   /**

+ 8 - 44
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -1000,9 +1000,7 @@ public class TaskTracker
   private void startCleanupThreads() throws IOException {
   private void startCleanupThreads() throws IOException {
     taskCleanupThread.setDaemon(true);
     taskCleanupThread.setDaemon(true);
     taskCleanupThread.start();
     taskCleanupThread.start();
-    directoryCleanupThread = new CleanupQueue(originalConf);
-    directoryCleanupThread.setDaemon(true);
-    directoryCleanupThread.start();
+    directoryCleanupThread = new CleanupQueue();
   }
   }
   
   
   /**
   /**
@@ -1450,7 +1448,7 @@ public class TaskTracker
         // Delete the job directory for this  
         // Delete the job directory for this  
         // task if the job is done/failed
         // task if the job is done/failed
         if (!rjob.keepJobFiles){
         if (!rjob.keepJobFiles){
-          directoryCleanupThread.addToQueue(getLocalFiles(fConf, 
+          directoryCleanupThread.addToQueue(fConf, getLocalFiles(fConf, 
             getLocalJobDir(rjob.getJobID().toString())));
             getLocalJobDir(rjob.getJobID().toString())));
         }
         }
         // Remove this job 
         // Remove this job 
@@ -2513,17 +2511,20 @@ public class TaskTracker
             //might be using the dir. The JVM running the tasks would clean
             //might be using the dir. The JVM running the tasks would clean
             //the workdir per a task in the task process itself.
             //the workdir per a task in the task process itself.
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-              directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+              directoryCleanupThread.addToQueue(defaultJobConf,
+                  getLocalFiles(defaultJobConf,
                   taskDir));
                   taskDir));
             }  
             }  
             
             
             else {
             else {
-              directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+              directoryCleanupThread.addToQueue(defaultJobConf,
+                  getLocalFiles(defaultJobConf,
                 taskDir+"/job.xml"));
                 taskDir+"/job.xml"));
             }
             }
           } else {
           } else {
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
             if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-              directoryCleanupThread.addToQueue(getLocalFiles(defaultJobConf,
+              directoryCleanupThread.addToQueue(defaultJobConf,
+                  getLocalFiles(defaultJobConf,
                   taskDir+"/work"));
                   taskDir+"/work"));
             }  
             }  
           }
           }
@@ -3052,43 +3053,6 @@ public class TaskTracker
     return paths;
     return paths;
   }
   }
 
 
-  // cleanup queue which deletes files/directories of the paths queued up.
-  private static class CleanupQueue extends Thread {
-    private LinkedBlockingQueue<Path> queue = new LinkedBlockingQueue<Path>();
-    private JobConf conf;
-    
-    public CleanupQueue(JobConf conf) throws IOException{
-      setName("Directory/File cleanup thread");
-      setDaemon(true);
-      this.conf = conf;
-    }
-
-    public void addToQueue(Path... paths) {
-      for (Path p : paths) {
-        try {
-          queue.put(p);
-        } catch (InterruptedException ie) {}
-      }
-      return;
-    }
-
-    public void run() {
-      LOG.debug("cleanup thread started");
-      Path path = null;
-      while (true) {
-        try {
-          path = queue.take();
-          // delete the path.
-          FileSystem fs = path.getFileSystem(conf);
-          fs.delete(path, true);
-        } catch (IOException e) {
-          LOG.info("Error deleting path" + path);
-        } catch (InterruptedException t) {
-        }
-      }
-    }
-  }
-
   int getMaxCurrentMapTasks() {
   int getMaxCurrentMapTasks() {
     return maxCurrentMapTasks;
     return maxCurrentMapTasks;
   }
   }