Kaynağa Gözat

HADOOP-4296. Fix job client failures by not retiring a job as soon as it
is finished. (dhruba)



git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@706533 13f79535-47bb-0310-9956-ffa450edef68

Dhruba Borthakur 16 yıl önce
ebeveyn
işleme
2ddae54fcb

+ 3 - 0
CHANGES.txt

@@ -999,6 +999,9 @@ Release 0.19.0 - Unreleased
     list of jobs to be keyed by the priority, submit time, and job tracker id.
     (Amar Kamat via omalley)
 
+    HADOOP-4296. Fix job client failures by not retiring a job as soon as it
+    is finished. (dhruba)
+
 Release 0.18.2 - Unreleased
 
   BUG FIXES

+ 3 - 0
src/mapred/org/apache/hadoop/mapred/JobClient.java

@@ -1141,6 +1141,9 @@ public class JobClient extends Configured implements MRConstants, Tool  {
             break;
           }
           running = jc.getJob(jobId);
+          if (running == null) {
+            throw new IOException("Unable to fetch job status from server.");
+          }
           String report = 
             (" map " + StringUtils.formatPercent(running.mapProgress(), 0)+
              " reduce " + 

+ 18 - 3
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -113,6 +113,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
    */
   final int MAX_COMPLETE_USER_JOBS_IN_MEMORY;
 
+   /**
+    * The minimum time (in ms) that a job's information has to remain
+    * in the JobTracker's memory before it is retired.
+    */
+  static final int MIN_TIME_BEFORE_RETIRE = 60000;
+
+
   private int nextJobId = 1;
 
   public static final Log LOG = LogFactory.getLog(JobTracker.class);
@@ -343,12 +350,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         try {
           Thread.sleep(RETIRE_JOB_CHECK_INTERVAL);
           List<JobInProgress> retiredJobs = new ArrayList<JobInProgress>();
-          long retireBefore = System.currentTimeMillis() - 
-            RETIRE_JOB_INTERVAL;
+          long now = System.currentTimeMillis();
+          long retireBefore = now - RETIRE_JOB_INTERVAL;
+
           synchronized (jobs) {
             for(JobInProgress job: jobs.values()) {
               if (job.getStatus().getRunState() != JobStatus.RUNNING &&
                   job.getStatus().getRunState() != JobStatus.PREP &&
+                  (job.getFinishTime() + MIN_TIME_BEFORE_RETIRE < now) &&
                   (job.getFinishTime()  < retireBefore)) {
                 retiredJobs.add(job);
               }
@@ -1524,7 +1533,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     } catch (IOException ioe) {
       LOG.info("Failed to finalize the log file recovery for job " + id, ioe);
     }
-    
+
+    long now = System.currentTimeMillis();
     
     // Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs of a given user
     // in memory; information about the purged jobs is available via
@@ -1553,6 +1563,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
               if (rjob == job) {
                 break;
               }
+
+              // do not retire jobs that finished in the very recent past.
+              if (rjob.getFinishTime() + MIN_TIME_BEFORE_RETIRE > now) {
+                break;
+              }
                 
               // Cleanup all datastructures
               int rjobRunState =