浏览代码

HADOOP-5514. Fix JobTracker metrics and add metrics for wating, failed tasks.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@755497 13f79535-47bb-0310-9956-ffa450edef68
Christopher Douglas 16 年之前
父节点
当前提交
9bacee3f1a

+ 3 - 0
CHANGES.txt

@@ -748,6 +748,9 @@ Release 0.20.0 - Unreleased
     HADOOP-5463. Balancer throws "Not a host:port pair" unless port is
     specified in fs.default.name. (Stuart White via hairong)
 
+    HADOOP-5514. Fix JobTracker metrics and add metrics for wating, failed
+    tasks. (cdouglas)
+
 Release 0.19.2 - Unreleased
 
   BUG FIXES

+ 25 - 19
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -415,6 +415,8 @@ class JobInProgress {
                 (numMapTasks + numReduceTasks) +
                 " exceeds the configured limit " + maxTasks);
     }
+    jobtracker.getInstrumentation().addWaiting(
+        getJobID(), numMapTasks + numReduceTasks);
 
     maps = new TaskInProgress[numMapTasks];
     for(int i=0; i < numMapTasks; ++i) {
@@ -734,8 +736,7 @@ class JobInProgress {
   // Status update methods
   ////////////////////////////////////////////////////
   public synchronized void updateTaskStatus(TaskInProgress tip, 
-                                            TaskStatus status,
-                                            JobTrackerInstrumentation metrics) {
+                                            TaskStatus status) {
 
     double oldProgress = tip.getProgress();   // save old progress
     boolean wasRunning = tip.isRunning();
@@ -833,7 +834,7 @@ class JobInProgress {
         
         // Tell the job to fail the relevant task
         failedTask(tip, taskid, status, ttStatus,
-                   wasRunning, wasComplete, metrics);
+                   wasRunning, wasComplete);
 
         // Did the task failure lead to tip failure?
         TaskCompletionEvent.Status taskCompletionStatus = 
@@ -864,7 +865,7 @@ class JobInProgress {
         this.taskCompletionEvents.add(taskEvent);
         taskCompletionEventTracker++;
         if (state == TaskStatus.State.SUCCEEDED) {
-          completedTask(tip, status, metrics);
+          completedTask(tip, status);
         }
       }
     }
@@ -1267,6 +1268,7 @@ class JobInProgress {
     if (!isScheduled) {
       tip.addRunningTask(id, tts.getTrackerName());
     }
+    final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
 
     // keeping the earlier ordering intact
     String name;
@@ -1285,12 +1287,14 @@ class JobInProgress {
       splits = tip.getSplitNodes();
       if (tip.getActiveTasks().size() > 1)
         speculativeMapTasks++;
+      metrics.launchMap(id);
     } else {
       ++runningReduceTasks;
       name = Values.REDUCE.name();
       counter = Counter.TOTAL_LAUNCHED_REDUCES;
       if (tip.getActiveTasks().size() > 1)
         speculativeReduceTasks++;
+      metrics.launchReduce(id);
     }
     // Note that the logs are for the scheduled tasks only. Tasks that join on 
     // restart has already their logs in place.
@@ -1959,11 +1963,11 @@ class JobInProgress {
    * A taskid assigned to this JobInProgress has reported in successfully.
    */
   public synchronized boolean completedTask(TaskInProgress tip, 
-                                         TaskStatus status,
-                                         JobTrackerInstrumentation metrics) 
+                                            TaskStatus status)
   {
     TaskAttemptID taskid = status.getTaskID();
     int oldNumAttempts = tip.getActiveTasks().size();
+    final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
         
     // Sanity check: is the TIP already complete? 
     // It _is_ safe to not decrement running{Map|Reduce}Tasks and
@@ -2047,7 +2051,7 @@ class JobInProgress {
         terminateJob(JobStatus.KILLED);
       }
       else {
-        jobComplete(metrics);
+        jobComplete();
       }
       // The job has been killed/failed/successful
       // JobTracker should cleanup this task
@@ -2085,10 +2089,9 @@ class JobInProgress {
   /**
    * The job is done since all it's component tasks are either
    * successful or have failed.
-   * 
-   * @param metrics job-tracker metrics
    */
-  private void jobComplete(JobTrackerInstrumentation metrics) {
+  private void jobComplete() {
+    final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
     //
     // All tasks are complete, then the job is done!
     //
@@ -2186,12 +2189,12 @@ class JobInProgress {
     while (!mapCleanupTasks.isEmpty()) {
       taskid = mapCleanupTasks.remove(0);
       tip = maps[taskid.getTaskID().getId()];
-      updateTaskStatus(tip, tip.getTaskStatus(taskid), null);
+      updateTaskStatus(tip, tip.getTaskStatus(taskid));
     }
     while (!reduceCleanupTasks.isEmpty()) {
       taskid = reduceCleanupTasks.remove(0);
       tip = reduces[taskid.getTaskID().getId()];
-      updateTaskStatus(tip, tip.getTaskStatus(taskid), null);
+      updateTaskStatus(tip, tip.getTaskStatus(taskid));
     }
   }
 
@@ -2239,8 +2242,8 @@ class JobInProgress {
   private void failedTask(TaskInProgress tip, TaskAttemptID taskid, 
                           TaskStatus status, 
                           TaskTrackerStatus taskTrackerStatus,
-                          boolean wasRunning, boolean wasComplete,
-                          JobTrackerInstrumentation metrics) {
+                          boolean wasRunning, boolean wasComplete) {
+    final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
     // check if the TIP is already failed
     boolean wasFailed = tip.isFailed();
 
@@ -2258,6 +2261,7 @@ class JobInProgress {
         launchedSetup = false;
       } else if (tip.isMapTask()) {
         runningMapTasks -= 1;
+        metrics.failedMap(taskid);
         // remove from the running queue and put it in the non-running cache
         // if the tip is not complete i.e if the tip still needs to be run
         if (!isComplete) {
@@ -2266,6 +2270,7 @@ class JobInProgress {
         }
       } else {
         runningReduceTasks -= 1;
+        metrics.failedReduce(taskid);
         // remove from the running queue and put in the failed queue if the tip
         // is not complete
         if (!isComplete) {
@@ -2417,7 +2422,7 @@ class JobInProgress {
    */
   public void failedTask(TaskInProgress tip, TaskAttemptID taskid, String reason, 
                          TaskStatus.Phase phase, TaskStatus.State state, 
-                         String trackerName, JobTrackerInstrumentation metrics) {
+                         String trackerName) {
     TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), 
                                                     taskid,
                                                     0.0f,
@@ -2434,7 +2439,7 @@ class JobInProgress {
     status.setStartTime(startTime);
     status.setFinishTime(System.currentTimeMillis());
     boolean wasComplete = tip.isComplete();
-    updateTaskStatus(tip, status, metrics);
+    updateTaskStatus(tip, status);
     boolean isComplete = tip.isComplete();
     if (wasComplete && !isComplete) { // mark a successful tip as failed
       String taskType = getTaskType(tip);
@@ -2451,6 +2456,8 @@ class JobInProgress {
    */
   synchronized void garbageCollect() {
     // Let the JobTracker know that a job is complete
+    jobtracker.getInstrumentation(
+        ).decWaiting(getJobID(), pendingMaps() + pendingReduces());
     jobtracker.storeCompletedJob(this);
     jobtracker.finalizeJob(this);
       
@@ -2556,8 +2563,7 @@ class JobInProgress {
   
   synchronized void fetchFailureNotification(TaskInProgress tip, 
                                              TaskAttemptID mapTaskId, 
-                                             String trackerName, 
-                                             JobTrackerInstrumentation metrics) {
+                                             String trackerName) {
     Integer fetchFailures = mapTaskIdToFetchFailuresMap.get(mapTaskId);
     fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
     mapTaskIdToFetchFailuresMap.put(mapTaskId, fetchFailures);
@@ -2577,7 +2583,7 @@ class JobInProgress {
       failedTask(tip, mapTaskId, "Too many fetch-failures",                            
                  (tip.isMapTask() ? TaskStatus.Phase.MAP : 
                                     TaskStatus.Phase.REDUCE), 
-                 TaskStatus.State.FAILED, trackerName, metrics);
+                 TaskStatus.State.FAILED, trackerName);
       
       mapTaskIdToFetchFailuresMap.remove(mapTaskId);
     }

+ 19 - 19
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -253,7 +253,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
                                      tip.isMapTask()? TaskStatus.Phase.MAP:
                                      TaskStatus.Phase.STARTING,
                                      TaskStatus.State.FAILED,
-                                     trackerName, myInstrumentation);
+                                     trackerName);
                   }
                   itr.remove();
                 } else {
@@ -931,7 +931,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         // This will add the tip failed event in the new log
         tip.getJob().failedTask(tip, id, status.getDiagnosticInfo(), 
                                 status.getPhase(), status.getRunState(), 
-                                status.getTaskTracker(), myInstrumentation);
+                                status.getTaskTracker());
       }
     }
     
@@ -1045,7 +1045,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       taskStatus.setCounters(counter);
       
       // II. Replay the status
-      job.updateTaskStatus(tip, taskStatus, myInstrumentation);
+      job.updateTaskStatus(tip, taskStatus);
       
       // III. Prevent the task from expiry
       expireLaunchingTasks.removeTask(attemptId);
@@ -1082,7 +1082,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       taskStatus.setDiagnosticInfo(diagInfo); // diag info
 
       // II. Update the task status
-     job.updateTaskStatus(tip, taskStatus, myInstrumentation);
+     job.updateTaskStatus(tip, taskStatus);
 
      // III. Prevent the task from expiry
      expireLaunchingTasks.removeTask(attemptId);
@@ -1231,7 +1231,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     }
   }
 
-  private JobTrackerInstrumentation myInstrumentation = null;
+  private final JobTrackerInstrumentation myInstrumentation;
     
   /////////////////////////////////////////////////////////////////
   // The real JobTracker
@@ -1457,18 +1457,21 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     
     trackerIdentifier = getDateFormat().format(new Date());
 
-    Class<? extends JobTrackerInstrumentation> metricsInst = getInstrumentationClass(jobConf);
+    // Initialize instrumentation
+    JobTrackerInstrumentation tmp;
+    Class<? extends JobTrackerInstrumentation> metricsInst =
+      getInstrumentationClass(jobConf);
     try {
       java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
         metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} );
-      this.myInstrumentation = c.newInstance(this, jobConf);
+      tmp = c.newInstance(this, jobConf);
     } catch(Exception e) {
       //Reflection can throw lots of exceptions -- handle them all by 
       //falling back on the default.
       LOG.error("failed to initialize job tracker metrics", e);
-      this.myInstrumentation = new JobTrackerMetricsInst(this, jobConf);
+      tmp = new JobTrackerMetricsInst(this, jobConf);
     }
- 
+    myInstrumentation = tmp;
     
     // The rpc/web-server ports can be ephemeral ports... 
     // ... ensure we have the correct info
@@ -1611,6 +1614,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         t, JobTrackerInstrumentation.class);
   }
 
+  JobTrackerInstrumentation getInstrumentation() {
+    return myInstrumentation;
+  }
+
   public static InetSocketAddress getAddress(Configuration conf) {
     String jobTrackerStr =
       conf.get("mapred.job.tracker", "localhost:8012");
@@ -1736,12 +1743,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     // taskid --> TIP
     taskidToTIPMap.put(taskid, tip);
     
-    // Note this launch
-    if (taskid.isMap()) {
-      myInstrumentation.launchMap(taskid);
-    } else {
-      myInstrumentation.launchReduce(taskid);
-    }
   }
     
   void removeTaskEntry(TaskAttemptID taskid) {
@@ -3249,7 +3250,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         
         // Update the job and inform the listeners if necessary
         JobStatus prevStatus = (JobStatus)job.getStatus().clone();
-        job.updateTaskStatus(tip, report, myInstrumentation);
+        job.updateTaskStatus(tip, report);
         JobStatus newStatus = (JobStatus)job.getStatus().clone();
         
         // Update the listeners if an incomplete job completes
@@ -3278,8 +3279,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
             }
             failedFetchMap.getJob().fetchFailureNotification(failedFetchMap, 
                                                              mapTaskId, 
-                                                             failedFetchTrackerName, 
-                                                             myInstrumentation);
+                                                             failedFetchTrackerName);
           }
         }
       }
@@ -3329,7 +3329,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
                                TaskStatus.Phase.MAP : 
                                TaskStatus.Phase.REDUCE), 
                             killState,
-                            trackerName, myInstrumentation);
+                            trackerName);
             jobsWithFailures.add(job);
           }
         } else {

+ 12 - 1
src/mapred/org/apache/hadoop/mapred/JobTrackerInstrumentation.java

@@ -31,16 +31,27 @@ class JobTrackerInstrumentation {
   public void completeMap(TaskAttemptID taskAttemptID)
   { }
 
+  public void failedMap(TaskAttemptID taskAttemptID)
+  { }
+
   public void launchReduce(TaskAttemptID taskAttemptID)
   { }
 
   public void completeReduce(TaskAttemptID taskAttemptID)
-  {  }
+  { }
   
+  public void failedReduce(TaskAttemptID taskAttemptID)
+  { }
+
   public void submitJob(JobConf conf, JobID id) 
   { }
     
   public void completeJob(JobConf conf, JobID id) 
   { }
 
+  public void addWaiting(JobID id, int tasks)
+  { }
+
+  public void decWaiting(JobID id, int tasks)
+  { }
 }

+ 42 - 6
src/mapred/org/apache/hadoop/mapred/JobTrackerMetricsInst.java

@@ -22,15 +22,21 @@ import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.Updater;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.hadoop.metrics.util.MetricsRegistry;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
 
 class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater {
-  private MetricsRecord metricsRecord = null;
-  int numMapTasksLaunched = 0;
-  int numMapTasksCompleted = 0;
-  int numReduceTasksLaunched = 0;
-  int numReduceTasksCompleted = 0;
+  private final MetricsRecord metricsRecord;
+
+  private int numMapTasksLaunched = 0;
+  private int numMapTasksCompleted = 0;
+  private int numMapTasksFailed = 0;
+  private int numReduceTasksLaunched = 0;
+  private int numReduceTasksCompleted = 0;
+  private int numReduceTasksFailed = 0;
   private int numJobsSubmitted = 0;
   private int numJobsCompleted = 0;
+  private int numWaitingTasks = 0;
     
   public JobTrackerMetricsInst(JobTracker tracker, JobConf conf) {
     super(tracker, conf);
@@ -52,15 +58,21 @@ class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater
     synchronized (this) {
       metricsRecord.incrMetric("maps_launched", numMapTasksLaunched);
       metricsRecord.incrMetric("maps_completed", numMapTasksCompleted);
+      metricsRecord.incrMetric("maps_failed", numMapTasksFailed);
       metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched);
       metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted);
+      metricsRecord.incrMetric("reduces_failed", numReduceTasksFailed);
       metricsRecord.incrMetric("jobs_submitted", numJobsSubmitted);
       metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
-            
+      metricsRecord.incrMetric("waiting_tasks", numWaitingTasks);
+
       numMapTasksLaunched = 0;
       numMapTasksCompleted = 0;
+      numMapTasksFailed = 0;
       numReduceTasksLaunched = 0;
       numReduceTasksCompleted = 0;
+      numReduceTasksFailed = 0;
+      numWaitingTasks = 0;
       numJobsSubmitted = 0;
       numJobsCompleted = 0;
     }
@@ -76,6 +88,7 @@ class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater
   @Override
   public synchronized void launchMap(TaskAttemptID taskAttemptID) {
     ++numMapTasksLaunched;
+    decWaiting(taskAttemptID.getJobID(), 1);
   }
 
   @Override
@@ -83,9 +96,16 @@ class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater
     ++numMapTasksCompleted;
   }
 
+  @Override
+  public synchronized void failedMap(TaskAttemptID taskAttemptID) {
+    ++numMapTasksFailed;
+    addWaiting(taskAttemptID.getJobID(), 1);
+  }
+
   @Override
   public synchronized void launchReduce(TaskAttemptID taskAttemptID) {
     ++numReduceTasksLaunched;
+    decWaiting(taskAttemptID.getJobID(), 1);
   }
 
   @Override
@@ -93,6 +113,12 @@ class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater
     ++numReduceTasksCompleted;
   }
 
+  @Override
+  public synchronized void failedReduce(TaskAttemptID taskAttemptID) {
+    ++numReduceTasksFailed;
+    addWaiting(taskAttemptID.getJobID(), 1);
+  }
+
   @Override
   public synchronized void submitJob(JobConf conf, JobID id) {
     ++numJobsSubmitted;
@@ -102,4 +128,14 @@ class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater
   public synchronized void completeJob(JobConf conf, JobID id) {
     ++numJobsCompleted;
   }
+
+  @Override
+  public synchronized void addWaiting(JobID id, int tasks) {
+    numWaitingTasks += tasks;
+  }
+
+  @Override
+  public synchronized void decWaiting(JobID id, int tasks) {
+    numWaitingTasks -= tasks;
+  }
 }