Procházet zdrojové kódy

HADOOP-1446. Update the TaskTracker metrics while the task is running.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@544740 13f79535-47bb-0310-9956-ffa450edef68
Owen O'Malley před 18 roky
rodič
revize
99a289a5e6

+ 3 - 0
CHANGES.txt

@@ -78,6 +78,9 @@ Trunk (unreleased changes)
  25. HADOOP-1461.  Fix the synchronization of the task tracker to
      avoid lockups in job cleanup.  (Arun C Murthy via omalley)
 
+ 26. HADOOP-1446.  Update the TaskTracker metrics while the task is
+     running. (Devaraj via omalley)
+
 Release 0.13.0 - 2007-06-08
 
  1. HADOOP-1047.  Fix TestReplication to succeed more reliably.

+ 2 - 1
src/java/org/apache/hadoop/mapred/TaskRunner.java

@@ -282,7 +282,8 @@ abstract class TaskRunner extends Thread {
 
         // Add main class and its arguments 
         vargs.add(TaskTracker.Child.class.getName());  // main of Child
-        vargs.add(tracker.taskReportPort + "");        // pass umbilical port
+        // pass umbilical port
+        vargs.add(tracker.getTaskTrackerReportPort() + ""); 
         vargs.add(t.getTaskId());                      // pass task identifier
 
         // Run java

+ 31 - 18
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -59,6 +59,7 @@ import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsException;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -92,7 +93,7 @@ public class TaskTracker
   InetSocketAddress jobTrackAddr;
     
   String taskReportBindAddress;
-  int taskReportPort;
+  private int taskReportPort;
 
   Server taskReportServer = null;
   InterTrackerProtocol jobClient;
@@ -120,8 +121,8 @@ public class TaskTracker
    */
   Map<String, TaskInProgress> runningTasks = null;
   Map<String, RunningJob> runningJobs = null;
-  int mapTotal = 0;
-  int reduceTotal = 0;
+  volatile int mapTotal = 0;
+  volatile int reduceTotal = 0;
   boolean justStarted = true;
     
   //dir -> DF
@@ -156,25 +157,33 @@ public class TaskTracker
    */  
   private int probe_sample_size = 50;
     
-  private class TaskTrackerMetrics {
+  private class TaskTrackerMetrics implements Updater {
     private MetricsRecord metricsRecord = null;
+    private int numCompletedTasks = 0;
       
     TaskTrackerMetrics() {
       MetricsContext context = MetricsUtil.getContext("mapred");
       metricsRecord = MetricsUtil.createRecord(context, "tasktracker");
+      context.registerUpdater(this);
     }
       
     synchronized void completeTask() {
-      if (metricsRecord != null) {
-        metricsRecord.incrMetric("tasks_completed", 1);
-      }
+      ++numCompletedTasks;
     }
-      
-    synchronized void update() {
-      if (metricsRecord != null) {
-        metricsRecord.setMetric("maps_running", mapTotal);
-        metricsRecord.setMetric("reduces_running", reduceTotal);
-        metricsRecord.update();
+    /**
+     * Since this object is a registered updater, this method will be called
+     * periodically, e.g. every 5 seconds.
+     */  
+    public void doUpdates(MetricsContext unused) {
+      synchronized (this) {
+        if (metricsRecord != null) {
+          metricsRecord.setMetric("maps_running", mapTotal);
+          metricsRecord.setMetric("reduces_running", reduceTotal);
+          metricsRecord.setMetric("taskSlots", (short)maxCurrentTasks);
+          metricsRecord.incrMetric("tasks_completed", numCompletedTasks);
+          metricsRecord.update();
+        }
+        numCompletedTasks = 0;
       }
     }
   }
@@ -681,6 +690,11 @@ public class TaskTracker
   public FileSystem getFileSystem(){
     return fs;
   }
+  
+  /** Return the port at which the tasktracker bound to */
+  public synchronized int getTaskTrackerReportPort() {
+    return taskReportPort;
+  }
     
   /** Queries the job tracker for a set of outputs ready to be copied
    * @param fromEventId the first event ID we want to start from, this is
@@ -769,8 +783,10 @@ public class TaskTracker
         String msg = "Exiting task tracker for disk error:\n" +
           StringUtils.stringifyException(de);
         LOG.error(msg);
-        jobClient.reportTaskTrackerError(taskTrackerName, 
-                                         "DiskErrorException", msg);
+        synchronized (this) {
+          jobClient.reportTaskTrackerError(taskTrackerName, 
+                                           "DiskErrorException", msg);
+        }
         return State.STALE;
       } catch (RemoteException re) {
         String reClass = re.getClassName();
@@ -852,7 +868,6 @@ public class TaskTracker
           }
           try {
             myMetrics.completeTask();
-            myMetrics.update();
           } catch (MetricsException me) {
             LOG.warn("Caught: " + StringUtils.stringifyException(me));
           }
@@ -1081,7 +1096,6 @@ public class TaskTracker
       } else {
         reduceTotal++;
       }
-      myMetrics.update();
     }
     try {
       localizeJob(tip);
@@ -1461,7 +1475,6 @@ public class TaskTracker
                              failure);
         runningTasks.put(task.getTaskId(), this);
         mapTotal++;
-        myMetrics.update();
       } else {
         LOG.warn("Output already reported lost:"+task.getTaskId());
       }