Browse Source

Merge -c 1194854 from branch-0.20-security to branch-0.20-security-204 to fix MAPREDUCE-2355.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-204@1194856 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 13 years ago
parent
commit
6cca165791
2 changed files with 50 additions and 11 deletions
  1. 18 0
      src/mapred/mapred-default.xml
  2. 32 11
      src/mapred/org/apache/hadoop/mapred/TaskTracker.java

+ 18 - 0
src/mapred/mapred-default.xml

@@ -241,6 +241,24 @@
   </description>
   </description>
 </property>
 </property>
 
 
+<property>
+  <name>mapreduce.tasktracker.outofband.heartbeat.damper</name>
+  <value>1000000</value>
+  <description>When out-of-band heartbeats are enabled, provides
+  damping to avoid overwhelming the JobTracker if too many out-of-band
+  heartbeats would occur. The damping is calculated such that the
+  heartbeat interval is divided by (T*D + 1) where T is the number
+  of completed tasks and D is the damper value.
+  
+  Setting this to a high value like the default provides no damping --
+  as soon as any task finishes, a heartbeat will be sent. Setting this
+  parameter to 0 is equivalent to disabling the out-of-band heartbeat feature.
+  A value of 1 would indicate that, after one task has completed, the
+  time to wait before the next heartbeat would be 1/2 the usual time.
+  After two tasks have finished, it would be 1/3 the usual time, etc.
+  </description>
+</property>
+
 <property>
 <property>
   <name>mapred.jobtracker.restart.recover</name>
   <name>mapred.jobtracker.restart.recover</name>
   <value>false</value>
   <value>false</value>

+ 32 - 11
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -45,6 +45,7 @@ import java.util.Vector;
 import java.util.Map.Entry;
 import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 import java.util.regex.Pattern;
 
 
 import javax.crypto.SecretKey;
 import javax.crypto.SecretKey;
@@ -360,9 +361,13 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
   static final String TT_OUTOFBAND_HEARBEAT =
   static final String TT_OUTOFBAND_HEARBEAT =
     "mapreduce.tasktracker.outofband.heartbeat";
     "mapreduce.tasktracker.outofband.heartbeat";
   private volatile boolean oobHeartbeatOnTaskCompletion;
   private volatile boolean oobHeartbeatOnTaskCompletion;
+  static final String TT_OUTOFBAND_HEARTBEAT_DAMPER = 
+    "mapreduce.tasktracker.outofband.heartbeat.damper";
+  static private final int DEFAULT_OOB_HEARTBEAT_DAMPER = 1000000;
+  private volatile int oobHeartbeatDamper;
   
   
   // Track number of completed tasks to send an out-of-band heartbeat
   // Track number of completed tasks to send an out-of-band heartbeat
-  private IntWritable finishedCount = new IntWritable(0);
+  private AtomicInteger finishedCount = new AtomicInteger(0);
   
   
   private MapEventsFetcherThread mapEventsFetcher;
   private MapEventsFetcherThread mapEventsFetcher;
   final int workerThreads;
   final int workerThreads;
@@ -854,6 +859,9 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
     
     
     oobHeartbeatOnTaskCompletion = 
     oobHeartbeatOnTaskCompletion = 
       fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
       fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
+    oobHeartbeatDamper = 
+      fConf.getInt(TT_OUTOFBAND_HEARTBEAT_DAMPER, 
+          DEFAULT_OOB_HEARTBEAT_DAMPER);
   }
   }
 
 
   /**
   /**
@@ -1555,25 +1563,39 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
     return recentMapEvents;
     return recentMapEvents;
   }
   }
 
 
+  private long getHeartbeatInterval(int numFinishedTasks) {
+    return (heartbeatInterval / (numFinishedTasks * oobHeartbeatDamper + 1));
+  }
+  
   /**
   /**
    * Main service loop.  Will stay in this loop forever.
    * Main service loop.  Will stay in this loop forever.
    */
    */
   State offerService() throws Exception {
   State offerService() throws Exception {
-    long lastHeartbeat = 0;
+    long lastHeartbeat = System.currentTimeMillis();
 
 
     while (running && !shuttingDown) {
     while (running && !shuttingDown) {
       try {
       try {
         long now = System.currentTimeMillis();
         long now = System.currentTimeMillis();
-
-        long waitTime = heartbeatInterval - (now - lastHeartbeat);
-        if (waitTime > 0) {
+        
+        // accelerate to account for multiple finished tasks up-front
+        long remaining = 
+          (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
+        while (remaining > 0) {
           // sleeps for the wait time or 
           // sleeps for the wait time or 
-          // until there are empty slots to schedule tasks
+          // until there are *enough* empty slots to schedule tasks
           synchronized (finishedCount) {
           synchronized (finishedCount) {
-            if (finishedCount.get() == 0) {
-              finishedCount.wait(waitTime);
+            finishedCount.wait(remaining);
+            
+            // Recompute
+            now = System.currentTimeMillis();
+            remaining = 
+              (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
+            
+            if (remaining <= 0) {
+              // Reset count 
+              finishedCount.set(0);
+              break;
             }
             }
-            finishedCount.set(0);
           }
           }
         }
         }
 
 
@@ -2349,8 +2371,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
   private void notifyTTAboutTaskCompletion() {
   private void notifyTTAboutTaskCompletion() {
     if (oobHeartbeatOnTaskCompletion) {
     if (oobHeartbeatOnTaskCompletion) {
       synchronized (finishedCount) {
       synchronized (finishedCount) {
-        int value = finishedCount.get();
-        finishedCount.set(value+1);
+        finishedCount.incrementAndGet();
         finishedCount.notify();
         finishedCount.notify();
       }
       }
     }
     }