Przeglądaj źródła

commit 2a423584ebf75afbb08b49c9f8267be6973a9e26
Author: Lee Tucker <ltucker@yahoo-inc.com>
Date: Thu Jul 30 17:40:48 2009 -0700

Applying patch 2899836.mr740.patch


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1076962 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 lat temu
rodzic
commit
c91abf6e6e

+ 21 - 0
conf/log4j.properties

@@ -3,6 +3,16 @@ hadoop.root.logger=INFO,console
 hadoop.log.dir=.
 hadoop.log.file=hadoop.log
 
+#
+# Job Summary Appender 
+#
+# Use following logger to send summary to separate file defined by 
+# hadoop.mapreduce.jobsummary.log.file rolled daily:
+# hadoop.mapreduce.jobsummary.logger=INFO,JSA
+# 
+hadoop.mapreduce.jobsummary.logger=${hadoop.root.logger}
+hadoop.mapreduce.jobsummary.log.file=hadoop-mapreduce.jobsummary.log
+
 # Define the root logger to the system property "hadoop.root.logger".
 log4j.rootLogger=${hadoop.root.logger}, EventCounter
 
@@ -92,3 +102,14 @@ log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
 # Sends counts of logging messages at different severity levels to Hadoop Metrics.
 #
 log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
+
+#
+# Job Summary Appender
+#
+log4j.appender.JSA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.JSA.File=${hadoop.log.dir}/${hadoop.mapreduce.jobsummary.log.file}
+log4j.appender.JSA.layout=org.apache.log4j.PatternLayout
+log4j.appender.JSA.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+log4j.appender.JSA.DatePattern=.yyyy-MM-dd
+log4j.logger.org.apache.hadoop.mapred.JobInProgress$JobSummary=${hadoop.mapreduce.jobsummary.logger}
+log4j.additivity.org.apache.hadoop.mapred.JobInProgress$JobSummary=false

+ 99 - 2
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -203,6 +203,8 @@ class JobInProgress {
     OTHER_LOCAL_MAPS,
     DATA_LOCAL_MAPS,
     RACK_LOCAL_MAPS,
+    SLOTS_MILLIS_MAPS,
+    SLOTS_MILLIS_REDUCES,
     FALLOW_SLOTS_MILLIS_MAPS,
     FALLOW_SLOTS_MILLIS_REDUCES
   }
@@ -2129,7 +2131,24 @@ class JobInProgress {
     }
     return true;
   }
+  
 
+  /**
+   * Metering: Occupied Slots * (Finish - Start)
+   * @param tip {@link TaskInProgress} to be metered which just completed, 
+   *            cannot be <code>null</code> 
+   * @param status {@link TaskStatus} of the completed task, cannot be 
+   *               <code>null</code>
+   */
+  private void meterTaskAttempt(TaskInProgress tip, TaskStatus status) {
+    Counter slotCounter = 
+      (tip.isMapTask()) ? Counter.SLOTS_MILLIS_MAPS : 
+                          Counter.SLOTS_MILLIS_REDUCES;
+    jobCounters.incrCounter(slotCounter, 
+                            tip.getNumSlotsRequired() * 
+                            (status.getFinishTime() - status.getStartTime()));
+  }
+  
   /**
    * A taskid assigned to this JobInProgress has reported in successfully.
    */
@@ -2140,6 +2159,9 @@ class JobInProgress {
     int oldNumAttempts = tip.getActiveTasks().size();
     final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
         
+    // Metering
+    meterTaskAttempt(tip, status);
+    
     // Sanity check: is the TIP already complete? 
     // It _is_ safe to not decrement running{Map|Reduce}Tasks and
     // finished{Map|Reduce}Tasks variables here because one and only
@@ -2281,6 +2303,12 @@ class JobInProgress {
       this.finishTime = System.currentTimeMillis();
       LOG.info("Job " + this.status.getJobID() + 
                " has completed successfully.");
+      
+      // Log the job summary (this should be done prior to logging to 
+      // job-history to ensure job-counters are in-sync 
+      JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
+      
+      // Log job-history
       JobHistory.JobInfo.logFinished(this.status.getJobID(), finishTime, 
                                      this.finishedMapTasks, 
                                      this.finishedReduceTasks, failedMapTasks, 
@@ -2296,6 +2324,9 @@ class JobInProgress {
   private synchronized void terminateJob(int jobTerminationState) {
     if ((status.getRunState() == JobStatus.RUNNING) ||
         (status.getRunState() == JobStatus.PREP)) {
+      // Log the job summary
+      JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
+      
       if (jobTerminationState == JobStatus.FAILED) {
         this.status = new JobStatus(status.getJobID(),
                                     1.0f, 1.0f, 1.0f, JobStatus.FAILED,
@@ -2314,6 +2345,7 @@ class JobInProgress {
                                      this.finishedReduceTasks);
       }
       garbageCollect();
+      
       jobtracker.getInstrumentation().terminateJob(
           this.conf, this.status.getJobID());
     }
@@ -2474,9 +2506,13 @@ class JobInProgress {
           failReduce(tip);
         }
       }
+      
+      // Metering
+      meterTaskAttempt(tip, status);
     }
         
-    // the case when the map was complete but the task tracker went down.
+    // The case when the map was complete but the task tracker went down.
+    // However, we don't need to do any metering here...
     if (wasComplete && !isComplete) {
       if (tip.isMapTask()) {
         // Put the task back in the cache. This will help locality for cases
@@ -2667,8 +2703,9 @@ class JobInProgress {
    * from the various tables.
    */
   synchronized void garbageCollect() {
-    //Cancel task tracker reservation
+    // Cancel task tracker reservation
     cancelReservedSlots();
+    
     // Let the JobTracker know that a job is complete
     jobtracker.getInstrumentation().decWaitingMaps(getJobID(), pendingMaps());
     jobtracker.getInstrumentation().decWaitingReduces(getJobID(), pendingReduces());
@@ -2853,4 +2890,64 @@ class JobInProgress {
   void setClusterSize(int clusterSize) {
     this.clusterSize = clusterSize;
   }
+
+  static class JobSummary {
+    static final Log LOG = LogFactory.getLog(JobSummary.class);
+    
+    // Escape sequences 
+    static final char EQUALS = '=';
+    static final char[] charsToEscape = 
+      {StringUtils.COMMA, EQUALS, StringUtils.ESCAPE_CHAR};
+    
+    /**
+     * Log a summary of the job's runtime.
+     * 
+     * @param job {@link JobInProgress} whose summary is to be logged, cannot
+     *            be <code>null</code>.
+     * @param cluster {@link ClusterStatus} of the cluster on which the job was
+     *                run, cannot be <code>null</code>
+     */
+    public static void logJobSummary(JobInProgress job, ClusterStatus cluster) {
+      JobStatus status = job.getStatus();
+      JobProfile profile = job.getProfile();
+      String user = StringUtils.escapeString(profile.getUser(), 
+                                             StringUtils.ESCAPE_CHAR, 
+                                             charsToEscape);
+      String queue = StringUtils.escapeString(profile.getQueueName(), 
+                                              StringUtils.ESCAPE_CHAR, 
+                                              charsToEscape);
+      Counters jobCounters = job.getJobCounters();
+      long mapSlotSeconds = 
+        (jobCounters.getCounter(Counter.SLOTS_MILLIS_MAPS) +
+         jobCounters.getCounter(Counter.FALLOW_SLOTS_MILLIS_MAPS)) / 1000;
+      long reduceSlotSeconds = 
+        (jobCounters.getCounter(Counter.SLOTS_MILLIS_REDUCES) +
+         jobCounters.getCounter(Counter.FALLOW_SLOTS_MILLIS_REDUCES)) / 1000;
+
+      LOG.info("jobId=" + job.getJobID() + StringUtils.COMMA +
+               "submitTime" + EQUALS + job.getStartTime() + StringUtils.COMMA +
+               "launchTime" + EQUALS + job.getLaunchTime() + StringUtils.COMMA +
+               "finishTime" + EQUALS + job.getFinishTime() + StringUtils.COMMA +
+               "numMaps" + EQUALS + job.getMapTasks().length + 
+                           StringUtils.COMMA +
+               "numSlotsPerMap" + EQUALS + job.getNumSlotsPerMap() + 
+                                  StringUtils.COMMA +
+               "numReduces" + EQUALS + job.getReduceTasks().length + 
+                              StringUtils.COMMA +
+               "numSlotsPerReduce" + EQUALS + job.getNumSlotsPerReduce() + 
+                                     StringUtils.COMMA +
+               "user" + EQUALS + user + StringUtils.COMMA +
+               "queue" + EQUALS + queue + StringUtils.COMMA +
+               "status" + EQUALS + 
+                          JobStatus.getJobRunState(status.getRunState()) + 
+                          StringUtils.COMMA + 
+               "mapSlotSeconds" + EQUALS + mapSlotSeconds + StringUtils.COMMA +
+               "reduceSlotsSeconds" + EQUALS + reduceSlotSeconds  + 
+                                      StringUtils.COMMA +
+               "clusterMapCapacity" + EQUALS + cluster.getMaxMapTasks() + 
+                                      StringUtils.COMMA +
+               "clusterReduceCapacity" + EQUALS + cluster.getMaxReduceTasks()
+      );
+    }
+  }
 }

+ 16 - 0
src/mapred/org/apache/hadoop/mapred/JobStatus.java

@@ -48,6 +48,22 @@ public class JobStatus implements Writable, Cloneable {
   public static final int PREP = 4;
   public static final int KILLED = 5;
 
+  private static final String UNKNOWN = "UNKNOWN";
+  private static final String[] runStates =
+      {UNKNOWN, "RUNNING", "SUCCEEDED", "FAILED", "PREP", "KILLED"};
+  
+  /**
+   * Helper method to get human-readable state of the job.
+   * @param state job state
+   * @return human-readable state of the job
+   */
+  public static String getJobRunState(int state) {
+    if (state < 1 || state >= runStates.length) {
+      return UNKNOWN;
+    }
+    return runStates[state];
+  }
+  
   private JobID jobid;
   private float mapProgress;
   private float reduceProgress;

+ 4 - 0
src/mapred/org/apache/hadoop/mapred/TaskInProgress.java

@@ -1186,4 +1186,8 @@ class TaskInProgress {
   TreeMap<TaskAttemptID, String> getActiveTasks() {
     return activeTasks;
   }
+
+  int getNumSlotsRequired() {
+    return numSlotsRequired;
+  }
 }