Browse Source

HADOOP-4236. Ensure un-initialized jobs are killed correctly on user-demand. Contributed by Sharad Agarwal.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@704989 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 16 years ago
parent
commit
1c0066ba57

+ 3 - 0
CHANGES.txt

@@ -921,6 +921,9 @@ Release 0.19.0 - Unreleased
     determine whether to canonicalize file paths or not.
     (Amareshwari Sriramadasu via ddas)
 
+    HADOOP-4236. Ensure un-initialized jobs are killed correctly on
+    user-demand. (Sharad Agarwal via acmurthy) 
+
 Release 0.18.2 - Unreleased
 
   BUG FIXES

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/EagerTaskInitializationListener.java

@@ -58,7 +58,7 @@ class EagerTaskInitializationListener extends JobInProgressListener {
           LOG.error("Job initialization failed:\n" +
                     StringUtils.stringifyException(t));
           if (job != null) {
-            job.terminateJob(JobStatus.FAILED);
+            job.fail();
           }
         }
       }

+ 133 - 60
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -142,6 +142,7 @@ class JobInProgress {
 
   private JobConf conf;
   AtomicBoolean tasksInited = new AtomicBoolean(false);
+  private JobInitKillStatus jobInitKillStatus = new JobInitKillStatus();
 
   private LocalFileSystem localFs;
   private JobID jobId;
@@ -340,6 +341,12 @@ class JobInProgress {
     if (tasksInited.get()) {
       return;
     }
+    synchronized(jobInitKillStatus){
+      if(jobInitKillStatus.killed) {
+        return;
+      }
+      jobInitKillStatus.initStarted = true;
+    }
 
     // log job info
     JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(), 
@@ -450,6 +457,15 @@ class JobInProgress {
                        numReduceTasks + 1, jobtracker, conf, this);
     setup[1].setSetupTask();
     
+    synchronized(jobInitKillStatus){
+      jobInitKillStatus.initDone = true;
+      if(jobInitKillStatus.killed) {
+        //setup not launched so directly terminate
+        terminateJob(JobStatus.KILLED);
+        return;
+      }
+    }
+    
     tasksInited.set(true);
     JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, 
                                  numMapTasks, numReduceTasks);
@@ -881,40 +897,47 @@ class JobInProgress {
    * Return a CleanupTask, if appropriate, to run on the given tasktracker
    * 
    */
-  public synchronized Task obtainCleanupTask(TaskTrackerStatus tts, 
+  public Task obtainCleanupTask(TaskTrackerStatus tts, 
                                              int clusterSize, 
                                              int numUniqueHosts,
                                              boolean isMapSlot
                                             ) throws IOException {
-    if (!canLaunchCleanupTask()) {
-      return null;
-    }
-    
-    String taskTracker = tts.getTrackerName();
-    // Update the last-known clusterSize
-    this.clusterSize = clusterSize;
-    if (!shouldRunOnTaskTracker(taskTracker)) {
+    if(!tasksInited.get()) {
       return null;
     }
     
-    List<TaskInProgress> cleanupTaskList = new ArrayList<TaskInProgress>();
-    if (isMapSlot) {
-      cleanupTaskList.add(cleanup[0]);
-    } else {
-      cleanupTaskList.add(cleanup[1]);
-    }
-    TaskInProgress tip = findTaskFromList(cleanupTaskList,
-                           tts, numUniqueHosts, false);
-    if (tip == null) {
-      return null;
+    synchronized(this) {
+      if (!canLaunchCleanupTask()) {
+        return null;
+      }
+      
+      String taskTracker = tts.getTrackerName();
+      // Update the last-known clusterSize
+      this.clusterSize = clusterSize;
+      if (!shouldRunOnTaskTracker(taskTracker)) {
+        return null;
+      }
+      
+      List<TaskInProgress> cleanupTaskList = new ArrayList<TaskInProgress>();
+      if (isMapSlot) {
+        cleanupTaskList.add(cleanup[0]);
+      } else {
+        cleanupTaskList.add(cleanup[1]);
+      }
+      TaskInProgress tip = findTaskFromList(cleanupTaskList,
+                             tts, numUniqueHosts, false);
+      if (tip == null) {
+        return null;
+      }
+      
+      // Now launch the cleanupTask
+      Task result = tip.getTaskToRun(tts.getTrackerName());
+      if (result != null) {
+        addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
+      }
+      return result;
     }
     
-    // Now launch the cleanupTask
-    Task result = tip.getTaskToRun(tts.getTrackerName());
-    if (result != null) {
-      addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
-    }
-    return result;
   }
   
   /**
@@ -956,40 +979,46 @@ class JobInProgress {
    * Return a SetupTask, if appropriate, to run on the given tasktracker
    * 
    */
-  public synchronized Task obtainSetupTask(TaskTrackerStatus tts, 
+  public Task obtainSetupTask(TaskTrackerStatus tts, 
                                              int clusterSize, 
                                              int numUniqueHosts,
                                              boolean isMapSlot
                                             ) throws IOException {
-    if (!canLaunchSetupTask()) {
-      return null;
-    }
-    
-    String taskTracker = tts.getTrackerName();
-    // Update the last-known clusterSize
-    this.clusterSize = clusterSize;
-    if (!shouldRunOnTaskTracker(taskTracker)) {
-      return null;
-    }
-    
-    List<TaskInProgress> setupTaskList = new ArrayList<TaskInProgress>();
-    if (isMapSlot) {
-      setupTaskList.add(setup[0]);
-    } else {
-      setupTaskList.add(setup[1]);
-    }
-    TaskInProgress tip = findTaskFromList(setupTaskList,
-                           tts, numUniqueHosts, false);
-    if (tip == null) {
+    if(!tasksInited.get()) {
       return null;
     }
     
-    // Now launch the setupTask
-    Task result = tip.getTaskToRun(tts.getTrackerName());
-    if (result != null) {
-      addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
+    synchronized(this) {
+      if (!canLaunchSetupTask()) {
+        return null;
+      }
+      
+      String taskTracker = tts.getTrackerName();
+      // Update the last-known clusterSize
+      this.clusterSize = clusterSize;
+      if (!shouldRunOnTaskTracker(taskTracker)) {
+        return null;
+      }
+      
+      List<TaskInProgress> setupTaskList = new ArrayList<TaskInProgress>();
+      if (isMapSlot) {
+        setupTaskList.add(setup[0]);
+      } else {
+        setupTaskList.add(setup[1]);
+      }
+      TaskInProgress tip = findTaskFromList(setupTaskList,
+                             tts, numUniqueHosts, false);
+      if (tip == null) {
+        return null;
+      }
+      
+      // Now launch the setupTask
+      Task result = tip.getTaskToRun(tts.getTrackerName());
+      if (result != null) {
+        addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
+      }
+      return result;
     }
-    return result;
   }
   
   /**
@@ -1883,7 +1912,7 @@ class JobInProgress {
     }
   }
   
-  synchronized void terminateJob(int jobTerminationState) {
+  private synchronized void terminateJob(int jobTerminationState) {
     if ((status.getRunState() == JobStatus.RUNNING) ||
         (status.getRunState() == JobStatus.PREP)) {
       if (jobTerminationState == JobStatus.FAILED) {
@@ -1909,12 +1938,33 @@ class JobInProgress {
 
   /**
    * Terminate the job and all its component tasks.
+   * Calling this will lead to marking the job as failed/killed. Cleanup 
+   * tip will be launched. If the job has not inited, it will directly call 
+   * terminateJob as there is no need to launch cleanup tip.
+   * This method is reentrant.
    * @param jobTerminationState job termination state
    */
   private synchronized void terminate(int jobTerminationState) {
+    if(!tasksInited.get()) {
+    	//init could not be done, we just terminate directly.
+      terminateJob(jobTerminationState);
+      return;
+    }
+
     if ((status.getRunState() == JobStatus.RUNNING) ||
          (status.getRunState() == JobStatus.PREP)) {
       LOG.info("Killing job '" + this.status.getJobID() + "'");
+      if (jobTerminationState == JobStatus.FAILED) {
+        if(jobFailed) {//reentrant
+          return;
+        }
+        jobFailed = true;
+      } else if (jobTerminationState == JobStatus.KILLED) {
+        if(jobKilled) {//reentrant
+          return;
+        }
+        jobKilled = true;
+      }
       //
       // kill all TIPs.
       //
@@ -1927,19 +1977,29 @@ class JobInProgress {
       for (int i = 0; i < reduces.length; i++) {
         reduces[i].kill();
       }
-      if (jobTerminationState == JobStatus.FAILED) {
-        jobFailed = true;
-      } else if (jobTerminationState == JobStatus.KILLED) {
-        jobKilled = true;
-      }
     }
   }
   
   /**
-   * Kill the job and all its component tasks.
+   * Kill the job and all its component tasks. This method is called from 
+   * jobtracker and should return fast as it locks the jobtracker.
    */
-  public synchronized void kill() {
-    terminate(JobStatus.KILLED);
+  public void kill() {
+    boolean killNow = false;
+    synchronized(jobInitKillStatus) {
+      if(jobInitKillStatus.killed) {//job is already marked for killing
+        return;
+      }
+      jobInitKillStatus.killed = true;
+      //if not in middle of init, terminate it now
+      if(!jobInitKillStatus.initStarted || jobInitKillStatus.initDone) {
+        //avoiding nested locking by setting flag
+        killNow = true;
+      }
+    }
+    if(killNow) {
+      terminate(JobStatus.KILLED);
+    }
   }
   
   /**
@@ -2312,4 +2372,17 @@ class JobInProgress {
     this.schedulingInfo = schedulingInfo;
     this.status.setSchedulingInfo(schedulingInfo.toString());
   }
+  
+  /**
+   * To keep track of kill and initTasks status of this job. initTasks() take 
+   * a lock on JobInProgress object. kill should avoid waiting on 
+   * JobInProgress lock since it may take a while to do initTasks().
+   */
+  private static class JobInitKillStatus {
+    //flag to be set if kill is called
+    boolean killed;
+    
+    boolean initStarted;
+    boolean initDone;
+  }
 }

+ 1 - 3
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -2264,9 +2264,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   public synchronized void killJob(JobID jobid) throws IOException {
     JobInProgress job = jobs.get(jobid);
     checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS);
-    if (job.inited()) {
-      job.kill();
-    }
+    job.kill();
   }
 
   /**