Browse Source

MAPREDUCE-3618. Fixed TaskHeartbeatHandler to not hold a global lock for all task-updates. Contributed by Siddarth Seth.
svn merge --ignore-ancestry -c 1229906 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1229908 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 13 years ago
parent
commit
8327271efc

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -129,6 +129,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3511. Removed a multitude of cloned/duplicate counters in the AM
     thereby reducing the AM heap size and preventing full GCs. (vinodkv)
 
+    MAPREDUCE-3618. Fixed TaskHeartbeatHandler to not hold a global lock for all
+    task-updates. (Siddarth Seth via vinodkv)
+
   BUG FIXES
     MAPREDUCE-3462. Fix Gridmix JUnit testcase failures. 
                     (Ravi Prakash and Ravi Gummadi via amarrk)

+ 4 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java

@@ -88,7 +88,7 @@ public class TaskAttemptListenerImpl extends CompositeService
 
   @Override
   public void init(Configuration conf) {
-   registerHeartbeatHandler();
+   registerHeartbeatHandler(conf);
    super.init(conf);
   }
 
@@ -98,9 +98,10 @@ public class TaskAttemptListenerImpl extends CompositeService
     super.start();
   }
 
-  protected void registerHeartbeatHandler() {
+  protected void registerHeartbeatHandler(Configuration conf) {
     taskHeartbeatHandler = new TaskHeartbeatHandler(context.getEventHandler(), 
-        context.getClock());
+        context.getClock(), conf.getInt(MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT, 
+            MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT));
     addService(taskHeartbeatHandler);
   }
 

+ 31 - 26
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java

@@ -18,9 +18,10 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -56,13 +57,15 @@ public class TaskHeartbeatHandler extends AbstractService {
   private final EventHandler eventHandler;
   private final Clock clock;
 
-  private Map<TaskAttemptId, Long> runningAttempts 
-    = new HashMap<TaskAttemptId, Long>();
+  private ConcurrentMap<TaskAttemptId, Long> runningAttempts;
 
-  public TaskHeartbeatHandler(EventHandler eventHandler, Clock clock) {
+  public TaskHeartbeatHandler(EventHandler eventHandler, Clock clock,
+      int numThreads) {
     super("TaskHeartbeatHandler");
     this.eventHandler = eventHandler;
     this.clock = clock;
+    runningAttempts =
+      new ConcurrentHashMap<TaskAttemptId, Long>(16, 0.75f, numThreads);
   }
 
   @Override
@@ -88,18 +91,16 @@ public class TaskHeartbeatHandler extends AbstractService {
     super.stop();
   }
 
-  public synchronized void receivedPing(TaskAttemptId attemptID) {
-    //only put for the registered attempts
-    if (runningAttempts.containsKey(attemptID)) {
-      runningAttempts.put(attemptID, clock.getTime());
-    }
+  public void receivedPing(TaskAttemptId attemptID) {
+  //only put for the registered attempts
+    runningAttempts.replace(attemptID, clock.getTime());
   }
 
-  public synchronized void register(TaskAttemptId attemptID) {
+  public void register(TaskAttemptId attemptID) {
     runningAttempts.put(attemptID, clock.getTime());
   }
 
-  public synchronized void unregister(TaskAttemptId attemptID) {
+  public void unregister(TaskAttemptId attemptID) {
     runningAttempts.remove(attemptID);
   }
 
@@ -108,25 +109,30 @@ public class TaskHeartbeatHandler extends AbstractService {
     @Override
     public void run() {
       while (!stopped && !Thread.currentThread().isInterrupted()) {
-        synchronized (TaskHeartbeatHandler.this) {
-          Iterator<Map.Entry<TaskAttemptId, Long>> iterator = 
+        Iterator<Map.Entry<TaskAttemptId, Long>> iterator =
             runningAttempts.entrySet().iterator();
 
-          //avoid calculating current time everytime in loop
-          long currentTime = clock.getTime();
+        // avoid calculating current time everytime in loop
+        long currentTime = clock.getTime();
+
+        while (iterator.hasNext()) {
+          Map.Entry<TaskAttemptId, Long> entry = iterator.next();
+          if (currentTime > entry.getValue() + taskTimeOut) {
 
-          while (iterator.hasNext()) {
-            Map.Entry<TaskAttemptId, Long> entry = iterator.next();
-            if (currentTime > entry.getValue() + taskTimeOut) {
-              //task is lost, remove from the list and raise lost event
+            //In case the iterator isn't picking up the latest.
+            // Extra lookup outside of the iterator - but only if the task
+            // is considered to be timed out.
+            Long taskTime = runningAttempts.get(entry.getKey());
+            if (taskTime != null && currentTime > taskTime + taskTimeOut) {
+              // task is lost, remove from the list and raise lost event
               iterator.remove();
-              eventHandler.handle(
-                  new TaskAttemptDiagnosticsUpdateEvent(entry.getKey(),
-                      "AttemptID:" + entry.getKey().toString() + 
-                      " Timed out after " + taskTimeOut/1000 + " secs"));
-              eventHandler.handle(new TaskAttemptEvent(entry
-                  .getKey(), TaskAttemptEventType.TA_TIMED_OUT));
+              eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry
+                  .getKey(), "AttemptID:" + entry.getKey().toString()
+                  + " Timed out after " + taskTimeOut / 1000 + " secs"));
+              eventHandler.handle(new TaskAttemptEvent(entry.getKey(),
+                  TaskAttemptEventType.TA_TIMED_OUT));
             }
+
           }
         }
         try {
@@ -137,7 +143,6 @@ public class TaskHeartbeatHandler extends AbstractService {
         }
       }
     }
-    
   }
 
 }

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java

@@ -43,7 +43,7 @@ public class TestTaskAttemptListenerImpl {
     }
     
     @Override
-    protected void registerHeartbeatHandler() {
+    protected void registerHeartbeatHandler(Configuration conf) {
       //Empty
     }