浏览代码

HADOOP-3864. Prevent the JobTracker from locking up when a job is being
initialized. (acmurthy via omalley)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@684143 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 17 年之前
父节点
当前提交
905c968b29

+ 3 - 0
CHANGES.txt

@@ -165,6 +165,9 @@ Trunk (unreleased changes)
     HADOOP-3863. Use a thread-local string encoder rather than a static one
     HADOOP-3863. Use a thread-local string encoder rather than a static one
     that is protected by a lock. (acmurthy via omalley)
     that is protected by a lock. (acmurthy via omalley)
 
 
+    HADOOP-3864. Prevent the JobTracker from locking up when a job is being
+    initialized. (acmurthy via omalley)
+
   BUG FIXES
   BUG FIXES
 
 
     HADOOP-3563.  Refactor the distributed upgrade code so that it is 
     HADOOP-3563.  Refactor the distributed upgrade code so that it is 

+ 17 - 6
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeMap;
 import java.util.Vector;
 import java.util.Vector;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
@@ -122,7 +123,7 @@ class JobInProgress {
   long finishTime;
   long finishTime;
 
 
   private JobConf conf;
   private JobConf conf;
-  boolean tasksInited = false;
+  AtomicBoolean tasksInited = new AtomicBoolean(false);
 
 
   private LocalFileSystem localFs;
   private LocalFileSystem localFs;
   private JobID jobId;
   private JobID jobId;
@@ -297,12 +298,22 @@ class JobInProgress {
     }
     }
     return cache;
     return cache;
   }
   }
+  
+  /**
+   * Check if the job has been initialized.
+   * @return <code>true</code> if the job has been initialized, 
+   *         <code>false</code> otherwise
+   */
+  public boolean inited() {
+    return tasksInited.get();
+  }
+  
   /**
   /**
    * Construct the splits, etc.  This is invoked from an async
    * Construct the splits, etc.  This is invoked from an async
    * thread so that split-computation doesn't block anyone.
    * thread so that split-computation doesn't block anyone.
    */
    */
   public synchronized void initTasks() throws IOException {
   public synchronized void initTasks() throws IOException {
-    if (tasksInited) {
+    if (tasksInited.get()) {
       return;
       return;
     }
     }
 
 
@@ -341,7 +352,7 @@ class JobInProgress {
       status.setMapProgress(1.0f);
       status.setMapProgress(1.0f);
       status.setReduceProgress(1.0f);
       status.setReduceProgress(1.0f);
       status.setRunState(JobStatus.SUCCEEDED);
       status.setRunState(JobStatus.SUCCEEDED);
-      tasksInited = true;
+      tasksInited.set(true);
       JobHistory.JobInfo.logStarted(profile.getJobID(), 
       JobHistory.JobInfo.logStarted(profile.getJobID(), 
                                     System.currentTimeMillis(), 0, 0);
                                     System.currentTimeMillis(), 0, 0);
       JobHistory.JobInfo.logFinished(profile.getJobID(), 
       JobHistory.JobInfo.logFinished(profile.getJobID(), 
@@ -375,7 +386,7 @@ class JobInProgress {
     }
     }
 
 
     this.status = new JobStatus(status.getJobID(), 0.0f, 0.0f, JobStatus.RUNNING);
     this.status = new JobStatus(status.getJobID(), 0.0f, 0.0f, JobStatus.RUNNING);
-    tasksInited = true;
+    tasksInited.set(true);
         
         
     JobHistory.JobInfo.logStarted(profile.getJobID(), System.currentTimeMillis(), numMapTasks, numReduceTasks);
     JobHistory.JobInfo.logStarted(profile.getJobID(), System.currentTimeMillis(), numMapTasks, numReduceTasks);
   }
   }
@@ -663,7 +674,7 @@ class JobInProgress {
                                             int clusterSize, 
                                             int clusterSize, 
                                             int numUniqueHosts
                                             int numUniqueHosts
                                            ) throws IOException {
                                            ) throws IOException {
-    if (!tasksInited) {
+    if (!tasksInited.get()) {
       LOG.info("Cannot create task split for " + profile.getJobID());
       LOG.info("Cannot create task split for " + profile.getJobID());
       return null;
       return null;
     }
     }
@@ -698,7 +709,7 @@ class JobInProgress {
                                                int clusterSize,
                                                int clusterSize,
                                                int numUniqueHosts
                                                int numUniqueHosts
                                               ) throws IOException {
                                               ) throws IOException {
-    if (!tasksInited) {
+    if (!tasksInited.get()) {
       LOG.info("Cannot create task split for " + profile.getJobID());
       LOG.info("Cannot create task split for " + profile.getJobID());
       return null;
       return null;
     }
     }

+ 9 - 3
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -1584,7 +1584,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     
     
   public synchronized void killJob(JobID jobid) {
   public synchronized void killJob(JobID jobid) {
     JobInProgress job = jobs.get(jobid);
     JobInProgress job = jobs.get(jobid);
-    job.kill();
+    if (job.inited()) {
+      job.kill();
+    }
   }
   }
 
 
   public synchronized JobProfile getJobProfile(JobID jobid) {
   public synchronized JobProfile getJobProfile(JobID jobid) {
@@ -1653,6 +1655,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     }
     }
   }
   }
     
     
+  TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0];
+  
   /* 
   /* 
    * Returns a list of TaskCompletionEvent for the given job, 
    * Returns a list of TaskCompletionEvent for the given job, 
    * starting from fromEventId.
    * starting from fromEventId.
@@ -1660,11 +1664,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
    */
    */
   public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
   public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
       JobID jobid, int fromEventId, int maxEvents) throws IOException{
       JobID jobid, int fromEventId, int maxEvents) throws IOException{
-    TaskCompletionEvent[] events;
+    TaskCompletionEvent[] events = EMPTY_EVENTS;
 
 
     JobInProgress job = this.jobs.get(jobid);
     JobInProgress job = this.jobs.get(jobid);
     if (null != job) {
     if (null != job) {
-      events = job.getTaskCompletionEvents(fromEventId, maxEvents);
+      if (job.inited()) {
+        events = job.getTaskCompletionEvents(fromEventId, maxEvents);
+      }
     }
     }
     else {
     else {
       events = completedJobStatusStore.readJobTaskCompletionEvents(jobid, fromEventId, maxEvents);
       events = completedJobStatusStore.readJobTaskCompletionEvents(jobid, fromEventId, maxEvents);