瀏覽代碼

HADOOP-1518. Add a session id to job metrics, for use by HOD. Contributed by David Bowen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@549933 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年之前
父節點
當前提交
4b70f8e760

+ 3 - 0
CHANGES.txt

@@ -239,6 +239,9 @@ Trunk (unreleased changes)
  73. HADOOP-1512.  Fix failing TestTextInputFormat on Windows.
  73. HADOOP-1512.  Fix failing TestTextInputFormat on Windows.
      (Senthil Subramanian via nigel)
      (Senthil Subramanian via nigel)
 
 
+ 74. HADOOP-1518.  Add a session id to job metrics, for use by HOD.
+     (David Bowen via cutting)
+
 
 
 Release 0.13.0 - 2007-06-08
 Release 0.13.0 - 2007-06-08
 
 

+ 28 - 0
src/java/org/apache/hadoop/mapred/JobConf.java

@@ -596,6 +596,34 @@ public class JobConf extends Configuration {
     set("mapred.job.name", name);
     set("mapred.job.name", name);
   }
   }
   
   
+  /**
+   * Get the user-specified session identifier. The default is the empty string.
+   *
+   * The session identifier is used to tag metric data that is reported to some
+   * performance metrics system via the org.apache.hadoop.metrics API.  The 
+   * session identifier is intended, in particular, for use by Hadoop-On-Demand 
+   * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently. 
+   * HOD will set the session identifier by modifying the hadoop-site.xml file 
+   * before starting the cluster.
+   *
+   * When not running under HOD, this identifer is expected to remain set to 
+   * the empty string.
+   *
+   * @return the session identifier, defaulting to ""
+   */
+  public String getSessionId() {
+      return get("session.id", "");
+  }
+  
+  /**
+   * Set the user-specified session idengifier.  
+   *
+   * @param sessionId the new session id
+   */
+  public void setSessionId(String sessionId) {
+      set("session.id", sessionId);
+  }
+    
   /**
   /**
    * Set the maximum no. of failures of a given job per tasktracker.
    * Set the maximum no. of failures of a given job per tasktracker.
    * 
    * 

+ 2 - 1
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -152,8 +152,9 @@ class JobInProgress {
                                     System.currentTimeMillis(), jobFile); 
                                     System.currentTimeMillis(), jobFile); 
         
         
     MetricsContext metricsContext = MetricsUtil.getContext("mapred");
     MetricsContext metricsContext = MetricsUtil.getContext("mapred");
-    this.jobMetrics = metricsContext.createRecord("job");
+    this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job");
     this.jobMetrics.setTag("user", conf.getUser());
     this.jobMetrics.setTag("user", conf.getUser());
+    this.jobMetrics.setTag("sessionId", conf.getSessionId());
     this.jobMetrics.setTag("jobName", conf.getJobName());
     this.jobMetrics.setTag("jobName", conf.getJobName());
     this.jobMetrics.setTag("jobId", jobid);
     this.jobMetrics.setTag("jobId", jobid);
   }
   }

+ 3 - 2
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -424,9 +424,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     private int numJobsSubmitted = 0;
     private int numJobsSubmitted = 0;
     private int numJobsCompleted = 0;
     private int numJobsCompleted = 0;
       
       
-    JobTrackerMetrics() {
+    JobTrackerMetrics(JobConf conf) {
       MetricsContext context = MetricsUtil.getContext("mapred");
       MetricsContext context = MetricsUtil.getContext("mapred");
       metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
       metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
+      metricsRecord.setTag("sessionId", conf.getSessionId());
       context.registerUpdater(this);
       context.registerUpdater(this);
     }
     }
       
       
@@ -651,7 +652,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
 
 
     this.startTime = System.currentTimeMillis();
     this.startTime = System.currentTimeMillis();
 
 
-    myMetrics = new JobTrackerMetrics();
+    myMetrics = new JobTrackerMetrics(jobConf);
     this.expireTrackersThread = new Thread(this.expireTrackers,
     this.expireTrackersThread = new Thread(this.expireTrackers,
                                            "expireTrackers");
                                            "expireTrackers");
     this.expireTrackersThread.start();
     this.expireTrackersThread.start();

+ 1 - 1
src/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -253,7 +253,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
   public LocalJobRunner(Configuration conf) throws IOException {
   public LocalJobRunner(Configuration conf) throws IOException {
     this.fs = FileSystem.get(conf);
     this.fs = FileSystem.get(conf);
     this.conf = conf;
     this.conf = conf;
-    myMetrics = new JobTrackerMetrics();
+    myMetrics = new JobTrackerMetrics(new JobConf(conf));
   }
   }
 
 
   // JobSubmissionProtocol methods
   // JobSubmissionProtocol methods

+ 1 - 0
src/java/org/apache/hadoop/mapred/ReduceTask.java

@@ -747,6 +747,7 @@ class ReduceTask extends Task {
       this.shuffleMetrics = 
       this.shuffleMetrics = 
         MetricsUtil.createRecord(metricsContext, "shuffleInput");
         MetricsUtil.createRecord(metricsContext, "shuffleInput");
       this.shuffleMetrics.setTag("user", conf.getUser());
       this.shuffleMetrics.setTag("user", conf.getUser());
+      this.shuffleMetrics.setTag("sessionId", conf.getSessionId());
 
 
       // Seed the random number generator with a reasonably globally unique seed
       // Seed the random number generator with a reasonably globally unique seed
       long randomSeed = System.nanoTime() + 
       long randomSeed = System.nanoTime() + 

+ 5 - 3
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -162,8 +162,10 @@ public class TaskTracker
     private int numCompletedTasks = 0;
     private int numCompletedTasks = 0;
       
       
     TaskTrackerMetrics() {
     TaskTrackerMetrics() {
+      JobConf conf = getJobConf();
       MetricsContext context = MetricsUtil.getContext("mapred");
       MetricsContext context = MetricsUtil.getContext("mapred");
       metricsRecord = MetricsUtil.createRecord(context, "tasktracker");
       metricsRecord = MetricsUtil.createRecord(context, "tasktracker");
+      metricsRecord.setTag("sessionId", conf.getSessionId());
       context.registerUpdater(this);
       context.registerUpdater(this);
     }
     }
       
       
@@ -317,10 +319,10 @@ public class TaskTracker
     int numCopiers = this.fConf.getInt("mapred.reduce.parallel.copies", 5);
     int numCopiers = this.fConf.getInt("mapred.reduce.parallel.copies", 5);
     //tweak the probe sample size (make it a function of numCopiers)
     //tweak the probe sample size (make it a function of numCopiers)
     probe_sample_size = Math.max(numCopiers*5, 50);
     probe_sample_size = Math.max(numCopiers*5, 50);
-        
-        
+    
+    
     this.myMetrics = new TaskTrackerMetrics();
     this.myMetrics = new TaskTrackerMetrics();
-        
+    
     // port numbers
     // port numbers
     this.taskReportPort = this.fConf.getInt("mapred.task.tracker.report.port", 50050);
     this.taskReportPort = this.fConf.getInt("mapred.task.tracker.report.port", 50050);
     // bind address
     // bind address