Browse Source

MAPREDUCE-3569. TaskAttemptListener holds a global lock for all task-updates. (Contributed by Vinod Kumar Vavilapalli)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1227485 13f79535-47bb-0310-9956-ffa450edef68
Siddharth Seth 13 years ago
parent
commit
03d46dc571

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -193,6 +193,9 @@ Release 0.23.1 - Unreleased
 
 
     MAPREDUCE-3568. Optimized Job's progress calculations in MR AM. (vinodkv)
     MAPREDUCE-3568. Optimized Job's progress calculations in MR AM. (vinodkv)
 
 
+    MAPREDUCE-3569. TaskAttemptListener holds a global lock for all
+    task-updates. (Vinod Kumar Vavilapalli via sseth)
+
   BUG FIXES
   BUG FIXES
 
 
     MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob
     MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob

+ 50 - 50
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java

@@ -19,14 +19,12 @@
 package org.apache.hadoop.mapred;
 package org.apache.hadoop.mapred;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
@@ -64,21 +62,22 @@ import org.apache.hadoop.yarn.service.CompositeService;
  * This class HAS to be in this package to access package private 
  * This class HAS to be in this package to access package private 
  * methods/classes.
  * methods/classes.
  */
  */
+@SuppressWarnings({"unchecked" , "deprecation"})
 public class TaskAttemptListenerImpl extends CompositeService 
 public class TaskAttemptListenerImpl extends CompositeService 
     implements TaskUmbilicalProtocol, TaskAttemptListener {
     implements TaskUmbilicalProtocol, TaskAttemptListener {
 
 
+  private static final JvmTask TASK_FOR_INVALID_JVM = new JvmTask(null, true);
+
   private static final Log LOG = LogFactory.getLog(TaskAttemptListenerImpl.class);
   private static final Log LOG = LogFactory.getLog(TaskAttemptListenerImpl.class);
 
 
   private AppContext context;
   private AppContext context;
   private Server server;
   private Server server;
   protected TaskHeartbeatHandler taskHeartbeatHandler;
   protected TaskHeartbeatHandler taskHeartbeatHandler;
   private InetSocketAddress address;
   private InetSocketAddress address;
-  private Map<WrappedJvmID, org.apache.hadoop.mapred.Task> jvmIDToActiveAttemptMap = 
-    Collections.synchronizedMap(new HashMap<WrappedJvmID, 
-        org.apache.hadoop.mapred.Task>());
+  private ConcurrentMap<WrappedJvmID, org.apache.hadoop.mapred.Task>
+    jvmIDToActiveAttemptMap
+      = new ConcurrentHashMap<WrappedJvmID, org.apache.hadoop.mapred.Task>();
   private JobTokenSecretManager jobTokenSecretManager = null;
   private JobTokenSecretManager jobTokenSecretManager = null;
-  private Set<WrappedJvmID> pendingJvms =
-    Collections.synchronizedSet(new HashSet<WrappedJvmID>());
   
   
   public TaskAttemptListenerImpl(AppContext context,
   public TaskAttemptListenerImpl(AppContext context,
       JobTokenSecretManager jobTokenSecretManager) {
       JobTokenSecretManager jobTokenSecretManager) {
@@ -123,10 +122,9 @@ public class TaskAttemptListenerImpl extends CompositeService
 
 
       server.start();
       server.start();
       InetSocketAddress listenerAddress = server.getListenerAddress();
       InetSocketAddress listenerAddress = server.getListenerAddress();
-      this.address =
-          NetUtils.createSocketAddr(listenerAddress.getAddress()
-              .getLocalHost().getCanonicalHostName()
-              + ":" + listenerAddress.getPort());
+      listenerAddress.getAddress();
+      this.address = NetUtils.createSocketAddr(InetAddress.getLocalHost()
+        .getCanonicalHostName() + ":" + listenerAddress.getPort());
     } catch (IOException e) {
     } catch (IOException e) {
       throw new YarnException(e);
       throw new YarnException(e);
     }
     }
@@ -408,57 +406,59 @@ public class TaskAttemptListenerImpl extends CompositeService
 
 
     WrappedJvmID wJvmID = new WrappedJvmID(jvmId.getJobId(), jvmId.isMap,
     WrappedJvmID wJvmID = new WrappedJvmID(jvmId.getJobId(), jvmId.isMap,
         jvmId.getId());
         jvmId.getId());
-    synchronized(this) {
-      if(pendingJvms.contains(wJvmID)) {
-        org.apache.hadoop.mapred.Task task = jvmIDToActiveAttemptMap.get(wJvmID);
-        if (task != null) { //there may be lag in the attempt getting added here
-         LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
-          jvmTask = new JvmTask(task, false);
-
-          //remove the task as it is no more needed and free up the memory
-          //Also we have already told the JVM to process a task, so it is no
-          //longer pending, and further request should ask it to exit.
-          pendingJvms.remove(wJvmID);
-          jvmIDToActiveAttemptMap.remove(wJvmID);
-        }
-      } else {
-        LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed.");
-        jvmTask = new JvmTask(null, true);
-      }
+
+    // Try to look up the task. We remove it directly as we don't give
+    // multiple tasks to a JVM
+    org.apache.hadoop.mapred.Task task = jvmIDToActiveAttemptMap
+        .remove(wJvmID);
+    if (task != null) {
+      LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
+      jvmTask = new JvmTask(task, false);
+
+      // remove the task as it is no more needed and free up the memory
+      // Also we have already told the JVM to process a task, so it is no
+      // longer pending, and further request should ask it to exit.
+    } else {
+      LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed.");
+      jvmTask = TASK_FOR_INVALID_JVM;
     }
     }
     return jvmTask;
     return jvmTask;
   }
   }
   
   
   @Override
   @Override
-  public synchronized void registerPendingTask(WrappedJvmID jvmID) {
-    //Save this JVM away as one that has not been handled yet
-    pendingJvms.add(jvmID);
+  public void registerPendingTask(
+      org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {
+    // Create the mapping so that it is easy to look up
+    // when the jvm comes back to ask for Task.
+
+    // A JVM not present in this map is an illegal task/JVM.
+    jvmIDToActiveAttemptMap.put(jvmID, task);
   }
   }
 
 
   @Override
   @Override
   public void registerLaunchedTask(
   public void registerLaunchedTask(
-      org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
-      org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {
-    synchronized(this) {
-      //create the mapping so that it is easy to look up
-      //when it comes back to ask for Task.
-      jvmIDToActiveAttemptMap.put(jvmID, task);
-      //This should not need to happen here, but just to be on the safe side
-      if(!pendingJvms.add(jvmID)) {
-        LOG.warn(jvmID+" launched without first being registered");
-      }
-    }
-    //register this attempt
+      org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID) {
+
+    // The task is launched. Register this for expiry-tracking.
+
+    // Timing can cause this to happen after the real JVM launches and gets a
+    // task which is still fine as we will only be tracking for expiry a little
+    // late than usual.
     taskHeartbeatHandler.register(attemptID);
     taskHeartbeatHandler.register(attemptID);
   }
   }
 
 
   @Override
   @Override
-  public void unregister(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
+  public void unregister(
+      org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
       WrappedJvmID jvmID) {
       WrappedJvmID jvmID) {
-    //remove the mapping if not already removed
+
+    // Unregistration also comes from the same TaskAttempt which does the
+    // registration. Events are ordered at TaskAttempt, so unregistration will
+    // always come after registration.
+
+    // remove the mapping if not already removed
     jvmIDToActiveAttemptMap.remove(jvmID);
     jvmIDToActiveAttemptMap.remove(jvmID);
-    //remove the pending if not already removed
-    pendingJvms.remove(jvmID);
+
     //unregister this attempt
     //unregister this attempt
     taskHeartbeatHandler.unregister(attemptID);
     taskHeartbeatHandler.unregister(attemptID);
   }
   }

+ 9 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java

@@ -32,20 +32,21 @@ public interface TaskAttemptListener {
   InetSocketAddress getAddress();
   InetSocketAddress getAddress();
 
 
   /**
   /**
-   * register a JVM with the listener.  This should be called as soon as a 
+   * Register a JVM with the listener.  This should be called as soon as a 
    * JVM ID is assigned to a task attempt, before it has been launched.
    * JVM ID is assigned to a task attempt, before it has been launched.
+   * @param task the task itself for this JVM.
    * @param jvmID The ID of the JVM .
    * @param jvmID The ID of the JVM .
    */
    */
-  void registerPendingTask(WrappedJvmID jvmID);
+  void registerPendingTask(Task task, WrappedJvmID jvmID);
   
   
   /**
   /**
-   * Register the task and task attempt with the JVM.  This should be called
-   * when the JVM has been launched.
-   * @param attemptID the id of the attempt for this JVM.
-   * @param task the task itself for this JVM.
-   * @param jvmID the id of the JVM handling the task.
+   * Register task attempt. This should be called when the JVM has been
+   * launched.
+   * 
+   * @param attemptID
+   *          the id of the attempt for this JVM.
    */
    */
-  void registerLaunchedTask(TaskAttemptId attemptID, Task task, WrappedJvmID jvmID);
+  void registerLaunchedTask(TaskAttemptId attemptID);
 
 
   /**
   /**
    * Unregister the JVM and the attempt associated with it.  This should be 
    * Unregister the JVM and the attempt associated with it.  This should be 

+ 5 - 5
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -1109,7 +1109,8 @@ public abstract class TaskAttemptImpl implements
       taskAttempt.jvmID = new WrappedJvmID(
       taskAttempt.jvmID = new WrappedJvmID(
           taskAttempt.remoteTask.getTaskID().getJobID(), 
           taskAttempt.remoteTask.getTaskID().getJobID(), 
           taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId());
           taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId());
-      taskAttempt.taskAttemptListener.registerPendingTask(taskAttempt.jvmID);
+      taskAttempt.taskAttemptListener.registerPendingTask(
+          taskAttempt.remoteTask, taskAttempt.jvmID);
       
       
       //launch the container
       //launch the container
       //create the container object to be launched for a given Task attempt
       //create the container object to be launched for a given Task attempt
@@ -1198,10 +1199,9 @@ public abstract class TaskAttemptImpl implements
       taskAttempt.launchTime = taskAttempt.clock.getTime();
       taskAttempt.launchTime = taskAttempt.clock.getTime();
       taskAttempt.shufflePort = event.getShufflePort();
       taskAttempt.shufflePort = event.getShufflePort();
 
 
-      // register it to TaskAttemptListener so that it start listening
-      // for it
-      taskAttempt.taskAttemptListener.registerLaunchedTask(
-          taskAttempt.attemptId, taskAttempt.remoteTask, taskAttempt.jvmID);
+      // register it to TaskAttemptListener so that it can start monitoring it.
+      taskAttempt.taskAttemptListener
+        .registerLaunchedTask(taskAttempt.attemptId);
       //TODO Resolve to host / IP in case of a local address.
       //TODO Resolve to host / IP in case of a local address.
       InetSocketAddress nodeHttpInetAddr =
       InetSocketAddress nodeHttpInetAddr =
           NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO:
           NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO:

+ 28 - 11
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java

@@ -17,8 +17,11 @@
 */
 */
 package org.apache.hadoop.mapred;
 package org.apache.hadoop.mapred;
 
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 
 import java.io.IOException;
 import java.io.IOException;
 
 
@@ -68,33 +71,47 @@ public class TestTaskAttemptListenerImpl {
     JVMId id = new JVMId("foo",1, true, 1);
     JVMId id = new JVMId("foo",1, true, 1);
     WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
     WrappedJvmID wid = new WrappedJvmID(id.getJobId(), id.isMap, id.getId());
 
 
+    // Verify ask before registration.
     //The JVM ID has not been registered yet so we should kill it.
     //The JVM ID has not been registered yet so we should kill it.
     JvmContext context = new JvmContext();
     JvmContext context = new JvmContext();
     context.jvmId = id; 
     context.jvmId = id; 
     JvmTask result = listener.getTask(context);
     JvmTask result = listener.getTask(context);
     assertNotNull(result);
     assertNotNull(result);
     assertTrue(result.shouldDie);
     assertTrue(result.shouldDie);
-    
-    //Now register the JVM, and see
-    listener.registerPendingTask(wid);
-    result = listener.getTask(context);
-    assertNull(result);
-    
+
+    // Verify ask after registration but before launch
     TaskAttemptId attemptID = mock(TaskAttemptId.class);
     TaskAttemptId attemptID = mock(TaskAttemptId.class);
     Task task = mock(Task.class);
     Task task = mock(Task.class);
     //Now put a task with the ID
     //Now put a task with the ID
-    listener.registerLaunchedTask(attemptID, task, wid);
+    listener.registerPendingTask(task, wid);
+    result = listener.getTask(context);
+    assertNotNull(result);
+    assertFalse(result.shouldDie);
+    // Unregister for more testing.
+    listener.unregister(attemptID, wid);
+
+    // Verify ask after registration and launch
+    //Now put a task with the ID
+    listener.registerPendingTask(task, wid);
+    listener.registerLaunchedTask(attemptID);
     verify(hbHandler).register(attemptID);
     verify(hbHandler).register(attemptID);
     result = listener.getTask(context);
     result = listener.getTask(context);
     assertNotNull(result);
     assertNotNull(result);
     assertFalse(result.shouldDie);
     assertFalse(result.shouldDie);
-    
+    // Don't unregister yet for more testing.
+
     //Verify that if we call it again a second time we are told to die.
     //Verify that if we call it again a second time we are told to die.
     result = listener.getTask(context);
     result = listener.getTask(context);
     assertNotNull(result);
     assertNotNull(result);
     assertTrue(result.shouldDie);
     assertTrue(result.shouldDie);
-    
+
     listener.unregister(attemptID, wid);
     listener.unregister(attemptID, wid);
+
+    // Verify after unregistration.
+    result = listener.getTask(context);
+    assertNotNull(result);
+    assertTrue(result.shouldDie);
+
     listener.stop();
     listener.stop();
   }
   }
 }
 }

+ 4 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java

@@ -91,6 +91,7 @@ import org.apache.hadoop.yarn.util.BuilderUtils;
  * Mock MRAppMaster. Doesn't start RPC servers.
  * Mock MRAppMaster. Doesn't start RPC servers.
  * No threads are started except of the event Dispatcher thread.
  * No threads are started except of the event Dispatcher thread.
  */
  */
+@SuppressWarnings("unchecked")
 public class MRApp extends MRAppMaster {
 public class MRApp extends MRAppMaster {
   private static final Log LOG = LogFactory.getLog(MRApp.class);
   private static final Log LOG = LogFactory.getLog(MRApp.class);
 
 
@@ -323,13 +324,13 @@ public class MRApp extends MRAppMaster {
         return NetUtils.createSocketAddr("localhost:54321");
         return NetUtils.createSocketAddr("localhost:54321");
       }
       }
       @Override
       @Override
-      public void registerLaunchedTask(TaskAttemptId attemptID, 
-          org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {}
+      public void registerLaunchedTask(TaskAttemptId attemptID) {}
       @Override
       @Override
       public void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID) {
       public void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID) {
       }
       }
       @Override
       @Override
-      public void registerPendingTask(WrappedJvmID jvmID) {
+      public void registerPendingTask(org.apache.hadoop.mapred.Task task,
+          WrappedJvmID jvmID) {
       }
       }
     };
     };
   }
   }
@@ -357,7 +358,6 @@ public class MRApp extends MRAppMaster {
     public MockContainerLauncher() {
     public MockContainerLauncher() {
     }
     }
 
 
-    @SuppressWarnings("unchecked")
     @Override
     @Override
     public void handle(ContainerLauncherEvent event) {
     public void handle(ContainerLauncherEvent event) {
       switch (event.getType()) {
       switch (event.getType()) {