Browse Source

commit 483618d2ee7a448c9a824bd86641e0c079d61647
Author: Arun C Murthy <acmurthy@apache.org>
Date: Wed Sep 8 18:25:29 2010 -0700

MAPREDUCE-2055. Fix JobTracker to decouple job retirement from copy of job-history file to HDFS and enhance RetiredJobInfo to carry aggregated job-counters to prevent a disk roundtrip on job-completion to fetch counters for the JobClient. Contributed by Krishna Ramachandran.

+++ b/YAHOO-CHANGES.txt
+
+ MAPREDUCE-2055. Fix JobTracker to decouple job retirement from copy of
+ job-history file to HDFS and enhance RetiredJobInfo to carry aggregated
+ job-counters to prevent a disk roundtrip on job-completion to fetch
+ counters for the JobClient. (Krishna Ramachandran via acmurthy)
+


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077667 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 years ago
parent
commit
9657137906

+ 0 - 8
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -1195,14 +1195,6 @@ public class JobInProgress {
     this.historyFile = file;
   }
 
-  boolean isHistoryFileCopied() {
-    return historyFileCopied;
-  }
-
-  synchronized void setHistoryFileCopied() {
-    this.historyFileCopied = true;
-  }
-  
   /**
    * Returns the job-level counters.
    * 

+ 14 - 11
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -263,7 +263,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     */
   static final int MIN_TIME_BEFORE_RETIRE = 0;
 
-
   private int nextJobId = 1;
 
   public static final Log LOG = LogFactory.getLog(JobTracker.class);
@@ -516,7 +515,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   synchronized void historyFileCopied(JobID jobid, String historyFile) {
     JobInProgress job = getJob(jobid);
     if (job != null) { //found in main cache
-      job.setHistoryFileCopied();
       if (historyFile != null) {
         job.setHistoryFile(historyFile);
       }
@@ -534,9 +532,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     final JobStatus status;
     final JobProfile profile;
     final long finishTime;
+    final Counters counters;
     private String historyFile;
-    RetireJobInfo(JobStatus status, JobProfile profile, long finishTime, 
-        String historyFile) {
+    RetireJobInfo(Counters counters, JobStatus status, JobProfile profile, 
+        long finishTime, String historyFile) {
+      this.counters = counters;
       this.status = status;
       this.profile = profile;
       this.finishTime = finishTime;
@@ -561,7 +561,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     }
 
     synchronized void addToCache(JobInProgress job) {
-      RetireJobInfo info = new RetireJobInfo(job.getStatus(), 
+      RetireJobInfo info = new RetireJobInfo(job.getCounters(), job.getStatus(),
           job.getProfile(), job.getFinishTime(), job.getHistoryFile());
       jobRetireInfoQ.add(info);
       jobIDStatusMap.put(info.status.getJobID(), info);
@@ -590,10 +590,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     }
 
     private boolean minConditionToRetire(JobInProgress job, long now) {
-      return job.getStatus().getRunState() != JobStatus.RUNNING &&
-          job.getStatus().getRunState() != JobStatus.PREP &&
-          (job.getFinishTime() + MIN_TIME_BEFORE_RETIRE < now) &&
-          job.isHistoryFileCopied();
+      return job.getStatus().getRunState() != JobStatus.RUNNING
+          && job.getStatus().getRunState() != JobStatus.PREP
+          && (job.getFinishTime() + MIN_TIME_BEFORE_RETIRE < now);
     }
     /**
      * The run method lives for the life of the JobTracker,
@@ -4294,7 +4293,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         // on the JobTracker since it isn't a synchronized method
         return job.getStatus();
       } else {
-        
         RetireJobInfo info = retireJobs.get(jobid);
         if (info != null) {
           return info.status;
@@ -4315,7 +4313,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         aclsManager.checkAccess(job, callerUGI, Operation.VIEW_JOB_COUNTERS);
 
         return isJobInited(job) ? job.getCounters() : EMPTY_COUNTERS;
-      } 
+      } else {
+        RetireJobInfo info = retireJobs.get(jobid);
+        if (info != null) {
+          return info.counters;
+        }
+      }
     }
 
     return completedJobStatusStore.readCounters(jobid);

+ 2 - 2
src/test/org/apache/hadoop/mapred/TestJobHistoryServer.java

@@ -50,7 +50,7 @@ public class TestJobHistoryServer extends TestCase {
       conf.setLong("mapred.job.tracker.retiredjobs.cache.size", 1);
       conf.setLong("mapred.jobtracker.retirejob.interval", 0);
       conf.setLong("mapred.jobtracker.retirejob.check", 0);
-      conf.setLong("mapred.jobtracker.completeuserjobs.maximum", 0);
+      conf.setLong("mapred.jobtracker.completeuserjobs.maximum", 2);
       conf.set(JobHistoryServer.MAPRED_HISTORY_SERVER_HTTP_ADDRESS,
           "localhost:0");
 
@@ -86,7 +86,7 @@ public class TestJobHistoryServer extends TestCase {
       conf.setLong("mapred.job.tracker.retiredjobs.cache.size", 1);
       conf.setLong("mapred.jobtracker.retirejob.interval", 0);
       conf.setLong("mapred.jobtracker.retirejob.check", 0);
-      conf.setLong("mapred.jobtracker.completeuserjobs.maximum", 0);
+      conf.setLong("mapred.jobtracker.completeuserjobs.maximum", 2);
       conf.set(JobHistoryServer.MAPRED_HISTORY_SERVER_HTTP_ADDRESS,
           "localhost:8090");
       conf.setBoolean(JobHistoryServer.MAPRED_HISTORY_SERVER_EMBEDDED, false);

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestRawHistoryFile.java

@@ -51,7 +51,7 @@ public class TestRawHistoryFile extends TestCase {
       conf.setLong("mapred.job.tracker.retiredjobs.cache.size", 1);
       conf.setLong("mapred.jobtracker.retirejob.interval", 0);
       conf.setLong("mapred.jobtracker.retirejob.check", 0);
-      conf.setLong("mapred.jobtracker.completeuserjobs.maximum", 0);
+      conf.setLong("mapred.jobtracker.completeuserjobs.maximum", 1);
       conf.set("mapreduce.history.server.http.address", "localhost:0");
 
       mrCluster = new MiniMRCluster(1, conf.get("fs.default.name"), 1,