Jelajahi Sumber

MAPREDUCE-3656. Fixed a race condition in MR AM which is failing the sort benchmark consistently. Contributed by Siddarth Seth.
svn merge --ignore-ancestry -c 1231314 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1231316 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 13 tahun lalu
induk
melakukan
77c29ccb53

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -427,6 +427,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3596. Fix scheduler to handle cleaned up containers, which NMs
     may subsequently report as running. (Vinod Kumar Vavilapalli via sseth)
 
+    MAPREDUCE-3656. Fixed a race condition in MR AM which is failing the sort
+    benchmark consistently. (Siddarth Seth via vinodkv)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

+ 33 - 18
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java

@@ -22,7 +22,9 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -77,6 +79,9 @@ public class TaskAttemptListenerImpl extends CompositeService
   private ConcurrentMap<WrappedJvmID, org.apache.hadoop.mapred.Task>
     jvmIDToActiveAttemptMap
       = new ConcurrentHashMap<WrappedJvmID, org.apache.hadoop.mapred.Task>();
+  private Set<WrappedJvmID> launchedJVMs = Collections
+      .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>()); 
+  
   private JobTokenSecretManager jobTokenSecretManager = null;
   
   public TaskAttemptListenerImpl(AppContext context,
@@ -412,22 +417,28 @@ public class TaskAttemptListenerImpl extends CompositeService
 
     // Try to look up the task. We remove it directly as we don't give
     // multiple tasks to a JVM
-    org.apache.hadoop.mapred.Task task = jvmIDToActiveAttemptMap
-        .remove(wJvmID);
-    if (task != null) {
-      LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
-      jvmTask = new JvmTask(task, false);
-
-      // remove the task as it is no more needed and free up the memory
-      // Also we have already told the JVM to process a task, so it is no
-      // longer pending, and further request should ask it to exit.
-    } else {
+    if (!jvmIDToActiveAttemptMap.containsKey(wJvmID)) {
       LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed.");
       jvmTask = TASK_FOR_INVALID_JVM;
+    } else {
+      if (!launchedJVMs.contains(wJvmID)) {
+        jvmTask = null;
+        LOG.info("JVM with ID: " + jvmId
+            + " asking for task before AM launch registered. Given null task");
+      } else {
+        // remove the task as it is no more needed and free up the memory.
+        // Also we have already told the JVM to process a task, so it is no
+        // longer pending, and further request should ask it to exit.
+        org.apache.hadoop.mapred.Task task =
+            jvmIDToActiveAttemptMap.remove(wJvmID);
+        launchedJVMs.remove(wJvmID);
+        LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
+        jvmTask = new JvmTask(task, false);
+      }
     }
     return jvmTask;
   }
-  
+
   @Override
   public void registerPendingTask(
       org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {
@@ -440,13 +451,12 @@ public class TaskAttemptListenerImpl extends CompositeService
 
   @Override
   public void registerLaunchedTask(
-      org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID) {
-
-    // The task is launched. Register this for expiry-tracking.
+      org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
+      WrappedJvmID jvmId) {
+    // The AM considers the task to be launched (Has asked the NM to launch it)
+    // The JVM will only be given a task after this registartion.
+    launchedJVMs.add(jvmId);
 
-    // Timing can cause this to happen after the real JVM launches and gets a
-    // task which is still fine as we will only be tracking for expiry a little
-    // late than usual.
     taskHeartbeatHandler.register(attemptID);
   }
 
@@ -459,7 +469,12 @@ public class TaskAttemptListenerImpl extends CompositeService
     // registration. Events are ordered at TaskAttempt, so unregistration will
     // always come after registration.
 
-    // remove the mapping if not already removed
+    // Remove from launchedJVMs before jvmIDToActiveAttemptMap to avoid
+    // synchronization issue with getTask(). getTask should be checking
+    // jvmIDToActiveAttemptMap before it checks launchedJVMs.
+ 
+    // remove the mappings if not already removed
+    launchedJVMs.remove(jvmID);
     jvmIDToActiveAttemptMap.remove(jvmID);
 
     //unregister this attempt

+ 2 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java

@@ -45,8 +45,9 @@ public interface TaskAttemptListener {
    * 
    * @param attemptID
    *          the id of the attempt for this JVM.
+   * @param jvmID the ID of the JVM.
    */
-  void registerLaunchedTask(TaskAttemptId attemptID);
+  void registerLaunchedTask(TaskAttemptId attemptID, WrappedJvmID jvmID);
 
   /**
    * Unregister the JVM and the attempt associated with it.  This should be 

+ 1 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java

@@ -93,6 +93,7 @@ public class TaskHeartbeatHandler extends AbstractService {
 
   public void receivedPing(TaskAttemptId attemptID) {
   //only put for the registered attempts
+    //TODO throw an exception if the task isn't registered.
     runningAttempts.replace(attemptID, clock.getTime());
   }
 

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -1201,7 +1201,7 @@ public abstract class TaskAttemptImpl implements
 
       // register it to TaskAttemptListener so that it can start monitoring it.
       taskAttempt.taskAttemptListener
-        .registerLaunchedTask(taskAttempt.attemptId);
+        .registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID);
       //TODO Resolve to host / IP in case of a local address.
       InetSocketAddress nodeHttpInetAddr =
           NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO:

+ 5 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.mapred;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -79,21 +80,21 @@ public class TestTaskAttemptListenerImpl {
     assertNotNull(result);
     assertTrue(result.shouldDie);
 
-    // Verify ask after registration but before launch
+    // Verify ask after registration but before launch. 
+    // Don't kill, should be null.
     TaskAttemptId attemptID = mock(TaskAttemptId.class);
     Task task = mock(Task.class);
     //Now put a task with the ID
     listener.registerPendingTask(task, wid);
     result = listener.getTask(context);
-    assertNotNull(result);
-    assertFalse(result.shouldDie);
+    assertNull(result);
     // Unregister for more testing.
     listener.unregister(attemptID, wid);
 
     // Verify ask after registration and launch
     //Now put a task with the ID
     listener.registerPendingTask(task, wid);
-    listener.registerLaunchedTask(attemptID);
+    listener.registerLaunchedTask(attemptID, wid);
     verify(hbHandler).register(attemptID);
     result = listener.getTask(context);
     assertNotNull(result);

+ 4 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java

@@ -324,7 +324,9 @@ public class MRApp extends MRAppMaster {
         return NetUtils.createSocketAddr("localhost:54321");
       }
       @Override
-      public void registerLaunchedTask(TaskAttemptId attemptID) {}
+      public void registerLaunchedTask(TaskAttemptId attemptID,
+          WrappedJvmID jvmID) {
+      }
       @Override
       public void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID) {
       }
@@ -463,6 +465,7 @@ public class MRApp extends MRAppMaster {
       return localStateMachine;
     }
 
+    @SuppressWarnings("rawtypes")
     public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
         Configuration conf, EventHandler eventHandler,
         TaskAttemptListener taskAttemptListener, Clock clock,