瀏覽代碼

Merge trunk into HA branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1232184 13f79535-47bb-0310-9956-ffa450edef68
Aaron Myers 13 年之前
父節點
當前提交
940eeb866c
共有 34 個文件被更改,包括 722 次插入129 次删除
  1. 7 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 4 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
  3. 1 1
      hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
  4. 9 10
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsTrash.java
  5. 5 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  6. 0 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
  7. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  8. 18 0
      hadoop-mapreduce-project/CHANGES.txt
  9. 33 18
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
  10. 18 14
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java
  11. 26 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
  12. 2 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java
  13. 1 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
  14. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
  15. 5 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
  16. 4 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
  17. 309 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java
  18. 11 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
  19. 1 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
  20. 3 7
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
  21. 4 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
  22. 41 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
  23. 1 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
  24. 5 5
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  25. 2 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
  26. 2 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
  27. 27 30
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
  28. 14 5
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java
  29. 5 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  30. 7 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
  31. 7 9
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
  32. 2 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
  33. 145 5
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
  34. 1 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

+ 7 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -83,6 +83,8 @@ Trunk (unreleased changes)
 
     HADOOP-4515. Configuration#getBoolean must not be case sensitive. (Sho Shimauchi via harsh)
 
+    HADOOP-7968. Errant println left in RPC.getHighestSupportedProtocol (Sho Shimauchi via harsh)
+
   BUGS
 
     HADOOP-7851. Configuration.getClasses() never returns the default value. 
@@ -203,6 +205,8 @@ Release 0.23.1 - Unreleased
     HADOOP-7348. Change 'addnl' in getmerge util to be a flag '-nl' instead.
     (XieXianshan via harsh)
 
+    HADOOP-7975. Add LZ4 as an entry in the default codec list, missed by HADOOP-7657 (harsh)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -269,6 +273,9 @@ Release 0.23.1 - Unreleased
    HADOOP-7964. Deadlock in NetUtils and SecurityUtil class initialization.
    (Daryn Sharp via suresh)
 
+   HADOOP-7974. TestViewFsTrash incorrectly determines the user's home
+   directory. (harsh via eli)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

+ 4 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java

@@ -791,7 +791,10 @@ public class RPC {
        String protocolName) {    
      Long highestVersion = 0L;
      ProtoClassProtoImpl highest = null;
- System.out.println("Size of protoMap for " + rpcKind + " =" + getProtocolImplMap(rpcKind).size());
+     if (LOG.isDebugEnabled()) {
+       LOG.debug("Size of protoMap for " + rpcKind + " ="
+           + getProtocolImplMap(rpcKind).size());
+     }
      for (Map.Entry<ProtoNameVer, ProtoClassProtoImpl> pv : 
            getProtocolImplMap(rpcKind).entrySet()) {
        if (pv.getKey().protocol.equals(protocolName)) {

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -161,7 +161,7 @@
 
 <property>
   <name>io.compression.codecs</name>
-  <value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec,org.apache.hadoop.io.compress.SnappyCodec</value>
+  <value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec,org.apache.hadoop.io.compress.SnappyCodec,org.apache.hadoop.io.compress.Lz4Codec</value>
   <description>A list of the compression codec classes that can be used 
                for compression/decompression.</description>
 </property>

+ 9 - 10
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsTrash.java

@@ -78,17 +78,16 @@ public class TestViewFsTrash {
     // set up viewfs's home dir root to point to home dir root on target
     // But home dir is different on linux, mac etc.
     // Figure it out by calling home dir on target
-    
-   String homeDir = fsTarget.getHomeDirectory().toUri().getPath();
-   int indexOf2ndSlash = homeDir.indexOf('/', 1);
-   String homeDirRoot = homeDir.substring(0, indexOf2ndSlash);
-   ConfigUtil.addLink(conf, homeDirRoot,
-       fsTarget.makeQualified(new Path(homeDirRoot)).toUri()); 
-   ConfigUtil.setHomeDirConf(conf, homeDirRoot);
-   Log.info("Home dir base " + homeDirRoot);
-    
+
+    String homeDirRoot = fsTarget.getHomeDirectory()
+        .getParent().toUri().getPath();
+    ConfigUtil.addLink(conf, homeDirRoot,
+        fsTarget.makeQualified(new Path(homeDirRoot)).toUri());
+    ConfigUtil.setHomeDirConf(conf, homeDirRoot);
+    Log.info("Home dir base " + homeDirRoot);
+
     fsView = ViewFileSystemTestSetup.setupForViewFs(conf, fsTarget);
-    
+
     // set working dir so that relative paths
     //fsView.setWorkingDirectory(new Path(fsTarget.getWorkingDirectory().toUri().getPath()));
     conf.set("fs.defaultFS", FsConstants.VIEWFS_URI.toString());

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -264,6 +264,8 @@ Release 0.23.1 - UNRELEASED
 
     HDFS-69. Improve the 'dfsadmin' commandline help. (harsh)
 
+    HDFS-2788. HdfsServerConstants#DN_KEEPALIVE_TIMEOUT is dead code (eli)
+
   OPTIMIZATIONS
 
     HDFS-2130. Switch default checksum to CRC32C. (todd)
@@ -327,6 +329,9 @@ Release 0.23.1 - UNRELEASED
     HDFS-2707. HttpFS should read the hadoop-auth secret from a file 
     instead inline from the configuration. (tucu)
 
+    HDFS-2790. FSNamesystem.setTimes throws exception with wrong
+    configuration name in the message. (Arpit Gupta via eli)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

+ 0 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java

@@ -86,7 +86,6 @@ public final class HdfsServerConstants {
   public static int READ_TIMEOUT_EXTENSION = 5 * 1000;
   public static int WRITE_TIMEOUT = 8 * 60 * 1000;
   public static int WRITE_TIMEOUT_EXTENSION = 5 * 1000; //for write pipeline
-  public static int DN_KEEPALIVE_TIMEOUT = 5 * 1000;
 
   /**
    * Defines the NameNode role.

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -1219,7 +1219,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     throws IOException, UnresolvedLinkException {
     if (!isAccessTimeSupported() && atime != -1) {
       throw new IOException("Access time for hdfs is not configured. " +
-                            " Please set dfs.support.accessTime configuration parameter.");
+                            " Please set " + DFS_NAMENODE_ACCESSTIME_PRECISION_KEY + " configuration parameter.");
     }
     writeLock();
     try {

+ 18 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -481,6 +481,24 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3625. CapacityScheduler web-ui display of queue's used capacity is broken.
     (Jason Lowe via mahadev)
 
+    MAPREDUCE-3596. Fix scheduler to handle cleaned up containers, which NMs
+    may subsequently report as running. (Vinod Kumar Vavilapalli via sseth)
+
+    MAPREDUCE-3656. Fixed a race condition in MR AM which is failing the sort
+    benchmark consistently. (Siddarth Seth via vinodkv)
+
+    MAPREDUCE-3532. Modified NM to report correct http address when an ephemeral
+    web port is configured. (Bhallamudi Venkata Siva Kamesh via vinodkv)
+
+    MAPREDUCE-3404. Corrected MR AM to honor speculative configuration and enable
+    speculating either maps or reduces. (Eric Payne via vinodkv)
+
+    MAPREDUCE-3649. Job End notification gives an error on calling back.
+    (Ravi Prakash via mahadev)
+
+    MAPREDUCE-3657. State machine visualize build fails. (Jason Lowe 
+    via mahadev)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

+ 33 - 18
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java

@@ -22,7 +22,9 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -77,6 +79,9 @@ public class TaskAttemptListenerImpl extends CompositeService
   private ConcurrentMap<WrappedJvmID, org.apache.hadoop.mapred.Task>
     jvmIDToActiveAttemptMap
       = new ConcurrentHashMap<WrappedJvmID, org.apache.hadoop.mapred.Task>();
+  private Set<WrappedJvmID> launchedJVMs = Collections
+      .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>()); 
+  
   private JobTokenSecretManager jobTokenSecretManager = null;
   
   public TaskAttemptListenerImpl(AppContext context,
@@ -412,22 +417,28 @@ public class TaskAttemptListenerImpl extends CompositeService
 
     // Try to look up the task. We remove it directly as we don't give
     // multiple tasks to a JVM
-    org.apache.hadoop.mapred.Task task = jvmIDToActiveAttemptMap
-        .remove(wJvmID);
-    if (task != null) {
-      LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
-      jvmTask = new JvmTask(task, false);
-
-      // remove the task as it is no more needed and free up the memory
-      // Also we have already told the JVM to process a task, so it is no
-      // longer pending, and further request should ask it to exit.
-    } else {
+    if (!jvmIDToActiveAttemptMap.containsKey(wJvmID)) {
       LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed.");
       jvmTask = TASK_FOR_INVALID_JVM;
+    } else {
+      if (!launchedJVMs.contains(wJvmID)) {
+        jvmTask = null;
+        LOG.info("JVM with ID: " + jvmId
+            + " asking for task before AM launch registered. Given null task");
+      } else {
+        // remove the task as it is no more needed and free up the memory.
+        // Also we have already told the JVM to process a task, so it is no
+        // longer pending, and further request should ask it to exit.
+        org.apache.hadoop.mapred.Task task =
+            jvmIDToActiveAttemptMap.remove(wJvmID);
+        launchedJVMs.remove(wJvmID);
+        LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
+        jvmTask = new JvmTask(task, false);
+      }
     }
     return jvmTask;
   }
-  
+
   @Override
   public void registerPendingTask(
       org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {
@@ -440,13 +451,12 @@ public class TaskAttemptListenerImpl extends CompositeService
 
   @Override
   public void registerLaunchedTask(
-      org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID) {
-
-    // The task is launched. Register this for expiry-tracking.
+      org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
+      WrappedJvmID jvmId) {
+    // The AM considers the task to be launched (Has asked the NM to launch it)
+    // The JVM will only be given a task after this registartion.
+    launchedJVMs.add(jvmId);
 
-    // Timing can cause this to happen after the real JVM launches and gets a
-    // task which is still fine as we will only be tracking for expiry a little
-    // late than usual.
     taskHeartbeatHandler.register(attemptID);
   }
 
@@ -459,7 +469,12 @@ public class TaskAttemptListenerImpl extends CompositeService
     // registration. Events are ordered at TaskAttempt, so unregistration will
     // always come after registration.
 
-    // remove the mapping if not already removed
+    // Remove from launchedJVMs before jvmIDToActiveAttemptMap to avoid
+    // synchronization issue with getTask(). getTask should be checking
+    // jvmIDToActiveAttemptMap before it checks launchedJVMs.
+ 
+    // remove the mappings if not already removed
+    launchedJVMs.remove(jvmID);
     jvmIDToActiveAttemptMap.remove(jvmID);
 
     //unregister this attempt

+ 18 - 14
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java

@@ -19,12 +19,11 @@
 package org.apache.hadoop.mapreduce.v2.app;
 
 import java.io.IOException;
-import java.io.InputStream;
+import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLConnection;
 import java.net.Proxy;
+import java.net.URL;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
@@ -40,7 +39,8 @@ import org.mortbay.log.Log;
  * User can specify number of retry attempts and a time interval at which to
  * attempt retries</li><li>
  * Cluster administrators can set final parameters to set maximum number of
- * tries (0 would disable job end notification) and max time interval</li><li>
+ * tries (0 would disable job end notification) and max time interval and a
+ * proxy if needed</li><li>
  * The URL may contain sentinels which will be replaced by jobId and jobStatus 
  * (eg. SUCCEEDED/KILLED/FAILED) </li> </ul>
  * </p>
@@ -59,8 +59,8 @@ public class JobEndNotifier implements Configurable {
 
   /**
    * Parse the URL that needs to be notified of the end of the job, along
-   * with the number of retries in case of failure and the amount of time to
-   * wait between retries
+   * with the number of retries in case of failure, the amount of time to
+   * wait between retries and proxy settings
    * @param conf the configuration 
    */
   public void setConf(Configuration conf) {
@@ -119,15 +119,19 @@ public class JobEndNotifier implements Configurable {
     boolean success = false;
     try {
       Log.info("Job end notification trying " + urlToNotify);
-      URLConnection conn = urlToNotify.openConnection(proxyToUse);
+      HttpURLConnection conn = (HttpURLConnection) urlToNotify.openConnection();
       conn.setConnectTimeout(5*1000);
       conn.setReadTimeout(5*1000);
       conn.setAllowUserInteraction(false);
-      InputStream is = conn.getInputStream();
-      conn.getContent();
-      is.close();
-      success = true;
-      Log.info("Job end notification to " + urlToNotify + " succeeded");
+      if(conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
+        Log.warn("Job end notification to " + urlToNotify +" failed with code: "
+        + conn.getResponseCode() + " and message \"" + conn.getResponseMessage()
+        +"\"");
+      }
+      else {
+        success = true;
+        Log.info("Job end notification to " + urlToNotify + " succeeded");
+      }
     } catch(IOException ioe) {
       Log.warn("Job end notification to " + urlToNotify + " failed", ioe);
     }
@@ -135,8 +139,8 @@ public class JobEndNotifier implements Configurable {
   }
 
   /**
-   * Notify a server of the completion of a submitted job. The server must have
-   * configured MRConfig.JOB_END_NOTIFICATION_URLS
+   * Notify a server of the completion of a submitted job. The user must have
+   * configured MRJobConfig.MR_JOB_END_NOTIFICATION_URL
    * @param jobReport JobReport used to read JobId and JobStatus
    * @throws InterruptedException
    */

+ 26 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -258,7 +258,7 @@ public class MRAppMaster extends CompositeService {
     dispatcher.register(TaskAttemptEventType.class, 
         new TaskAttemptEventDispatcher());
     dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
-    
+   
     if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
         || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
       //optional service to speculate on task attempts' progress
@@ -881,9 +881,31 @@ public class MRAppMaster extends CompositeService {
     }
     @Override
     public void handle(SpeculatorEvent event) {
-      if (!disabled && 
-          (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
-          || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false))) {
+      if (disabled) {
+        return;
+      }
+
+      TaskId tId = event.getTaskID();
+      TaskType tType = null;
+      /* event's TaskId will be null if the event type is JOB_CREATE or
+       * ATTEMPT_STATUS_UPDATE
+       */
+      if (tId != null) {
+        tType = tId.getTaskType(); 
+      }
+      boolean shouldMapSpec =
+              conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false);
+      boolean shouldReduceSpec =
+              conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+
+      /* The point of the following is to allow the MAP and REDUCE speculative
+       * config values to be independent:
+       * IF spec-exec is turned on for maps AND the task is a map task
+       * OR IF spec-exec is turned on for reduces AND the task is a reduce task
+       * THEN call the speculator to handle the event.
+       */
+      if ( (shouldMapSpec && (tType == null || tType == TaskType.MAP))
+        || (shouldReduceSpec && (tType == null || tType == TaskType.REDUCE))) {
         // Speculator IS enabled, direct the event to there.
         speculator.handle(event);
       }

+ 2 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java

@@ -45,8 +45,9 @@ public interface TaskAttemptListener {
    * 
    * @param attemptID
    *          the id of the attempt for this JVM.
+   * @param jvmID the ID of the JVM.
    */
-  void registerLaunchedTask(TaskAttemptId attemptID);
+  void registerLaunchedTask(TaskAttemptId attemptID, WrappedJvmID jvmID);
 
   /**
    * Unregister the JVM and the attempt associated with it.  This should be 

+ 1 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java

@@ -93,6 +93,7 @@ public class TaskHeartbeatHandler extends AbstractService {
 
   public void receivedPing(TaskAttemptId attemptID) {
   //only put for the registered attempts
+    //TODO throw an exception if the task isn't registered.
     runningAttempts.replace(attemptID, clock.getTime());
   }
 

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -1201,7 +1201,7 @@ public abstract class TaskAttemptImpl implements
 
       // register it to TaskAttemptListener so that it can start monitoring it.
       taskAttempt.taskAttemptListener
-        .registerLaunchedTask(taskAttempt.attemptId);
+        .registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID);
       //TODO Resolve to host / IP in case of a local address.
       InetSocketAddress nodeHttpInetAddr =
           NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO:

+ 5 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.mapred;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -79,21 +80,21 @@ public class TestTaskAttemptListenerImpl {
     assertNotNull(result);
     assertTrue(result.shouldDie);
 
-    // Verify ask after registration but before launch
+    // Verify ask after registration but before launch. 
+    // Don't kill, should be null.
     TaskAttemptId attemptID = mock(TaskAttemptId.class);
     Task task = mock(Task.class);
     //Now put a task with the ID
     listener.registerPendingTask(task, wid);
     result = listener.getTask(context);
-    assertNotNull(result);
-    assertFalse(result.shouldDie);
+    assertNull(result);
     // Unregister for more testing.
     listener.unregister(attemptID, wid);
 
     // Verify ask after registration and launch
     //Now put a task with the ID
     listener.registerPendingTask(task, wid);
-    listener.registerLaunchedTask(attemptID);
+    listener.registerLaunchedTask(attemptID, wid);
     verify(hbHandler).register(attemptID);
     result = listener.getTask(context);
     assertNotNull(result);

+ 4 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java

@@ -324,7 +324,9 @@ public class MRApp extends MRAppMaster {
         return NetUtils.createSocketAddr("localhost:54321");
       }
       @Override
-      public void registerLaunchedTask(TaskAttemptId attemptID) {}
+      public void registerLaunchedTask(TaskAttemptId attemptID,
+          WrappedJvmID jvmID) {
+      }
       @Override
       public void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID) {
       }
@@ -463,6 +465,7 @@ public class MRApp extends MRAppMaster {
       return localStateMachine;
     }
 
+    @SuppressWarnings("rawtypes")
     public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
         Configuration conf, EventHandler eventHandler,
         TaskAttemptListener taskAttemptListener, Clock clock,

+ 309 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java

@@ -0,0 +1,309 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSpeculativeExecution {
+
+  /*
+   * This class is used to control when speculative execution happens.
+   */
+  public static class TestSpecEstimator extends LegacyTaskRuntimeEstimator {
+    private static final long SPECULATE_THIS = 999999L;
+
+    public TestSpecEstimator() {
+      super();
+    }
+
+    /*
+     * This will only be called if speculative execution is turned on.
+     * 
+     * If either mapper or reducer speculation is turned on, this will be
+     * called.
+     * 
+     * This will cause speculation to engage for the first mapper or first
+     * reducer (that is, attempt ID "*_m_000000_0" or "*_r_000000_0")
+     * 
+     * If this attempt is killed, the retry will have attempt id 1, so it
+     * will not engage speculation again.
+     */
+    @Override
+    public long estimatedRuntime(TaskAttemptId id) {
+      if ((id.getTaskId().getId() == 0) && (id.getId() == 0)) {
+        return SPECULATE_THIS;
+      }
+      return super.estimatedRuntime(id);
+    }
+  }
+
+  private static final Log LOG = LogFactory.getLog(TestSpeculativeExecution.class);
+
+  protected static MiniMRYarnCluster mrCluster;
+
+  private static Configuration initialConf = new Configuration();
+  private static FileSystem localFs;
+  static {
+    try {
+      localFs = FileSystem.getLocal(initialConf);
+    } catch (IOException io) {
+      throw new RuntimeException("problem getting local fs", io);
+    }
+  }
+
+  private static Path TEST_ROOT_DIR =
+          new Path("target",TestSpeculativeExecution.class.getName() + "-tmpDir")
+               .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+  static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
+  private static Path TEST_OUT_DIR = new Path(TEST_ROOT_DIR, "test.out.dir");
+
+  @BeforeClass
+  public static void setup() throws IOException {
+
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    if (mrCluster == null) {
+      mrCluster = new MiniMRYarnCluster(TestSpeculativeExecution.class.getName(), 4);
+      Configuration conf = new Configuration();
+      mrCluster.init(conf);
+      mrCluster.start();
+    }
+
+    // workaround the absent public distcache.
+    localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
+    localFs.setPermission(APP_JAR, new FsPermission("700"));
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    if (mrCluster != null) {
+      mrCluster.stop();
+      mrCluster = null;
+    }
+  }
+
+  public static class SpeculativeMapper extends
+    Mapper<Object, Text, Text, IntWritable> {
+
+    public void map(Object key, Text value, Context context)
+            throws IOException, InterruptedException {
+      // Make one mapper slower for speculative execution
+      TaskAttemptID taid = context.getTaskAttemptID();
+      long sleepTime = 100;
+      Configuration conf = context.getConfiguration();
+      boolean test_speculate_map =
+              conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false);
+
+      // IF TESTING MAPPER SPECULATIVE EXECUTION:
+      //   Make the "*_m_000000_0" attempt take much longer than the others.
+      //   When speculative execution is enabled, this should cause the attempt
+      //   to be killed and restarted. At that point, the attempt ID will be
+      //   "*_m_000000_1", so sleepTime will still remain 100ms.
+      if ( (taid.getTaskType() == TaskType.MAP) && test_speculate_map
+            && (taid.getTaskID().getId() == 0) && (taid.getId() == 0)) {
+        sleepTime = 10000;
+      }
+      try{
+        Thread.sleep(sleepTime);
+      } catch(InterruptedException ie) {
+        // Ignore
+      }
+      context.write(value, new IntWritable(1));
+    }
+  }
+
+  public static class SpeculativeReducer extends
+    Reducer<Text,IntWritable,Text,IntWritable> {
+
+    public void reduce(Text key, Iterable<IntWritable> values, 
+                           Context context) throws IOException, InterruptedException {
+      // Make one reducer slower for speculative execution
+      TaskAttemptID taid = context.getTaskAttemptID();
+      long sleepTime = 100;
+      Configuration conf = context.getConfiguration();
+      boolean test_speculate_reduce =
+                conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+
+      // IF TESTING REDUCE SPECULATIVE EXECUTION:
+      //   Make the "*_r_000000_0" attempt take much longer than the others.
+      //   When speculative execution is enabled, this should cause the attempt
+      //   to be killed and restarted. At that point, the attempt ID will be
+      //   "*_r_000000_1", so sleepTime will still remain 100ms.
+      if ( (taid.getTaskType() == TaskType.REDUCE) && test_speculate_reduce
+            && (taid.getTaskID().getId() == 0) && (taid.getId() == 0)) {
+        sleepTime = 10000;
+      }
+      try{
+        Thread.sleep(sleepTime);
+      } catch(InterruptedException ie) {
+        // Ignore
+      }
+      context.write(key,new IntWritable(0));
+    }
+  }
+
+  @Test
+  public void testSpeculativeExecution() throws Exception {
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+           + " not found. Not running test.");
+      return;
+    }
+
+    /*------------------------------------------------------------------
+     * Test that Map/Red does not speculate if MAP_SPECULATIVE and 
+     * REDUCE_SPECULATIVE are both false.
+     * -----------------------------------------------------------------
+     */
+    Job job = runSpecTest(false, false);
+
+    boolean succeeded = job.waitForCompletion(true);
+    Assert.assertTrue(succeeded);
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+    Counters counters = job.getCounters();
+    Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+            .getValue());
+    Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES)
+            .getValue());
+    Assert.assertEquals(0, counters.findCounter(JobCounter.NUM_FAILED_MAPS)
+            .getValue());
+
+    /*----------------------------------------------------------------------
+     * Test that Mapper speculates if MAP_SPECULATIVE is true and
+     * REDUCE_SPECULATIVE is false.
+     * ---------------------------------------------------------------------
+     */
+    job = runSpecTest(true, false);
+
+    succeeded = job.waitForCompletion(true);
+    Assert.assertTrue(succeeded);
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+    counters = job.getCounters();
+
+    // The long-running map will be killed and a new one started.
+    Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+            .getValue());
+    Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES)
+            .getValue());
+    Assert.assertEquals(1, counters.findCounter(JobCounter.NUM_FAILED_MAPS)
+            .getValue());
+
+    /*----------------------------------------------------------------------
+     * Test that Reducer speculates if REDUCE_SPECULATIVE is true and
+     * MAP_SPECULATIVE is false.
+     * ---------------------------------------------------------------------
+     */
+    job = runSpecTest(false, true);
+
+    succeeded = job.waitForCompletion(true);
+    Assert.assertTrue(succeeded);
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+    counters = job.getCounters();
+
+    // The long-running map will be killed and a new one started.
+    Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+            .getValue());
+    Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES)
+            .getValue());
+  }
+
+  private Path createTempFile(String filename, String contents)
+      throws IOException {
+    Path path = new Path(TEST_ROOT_DIR, filename);
+    FSDataOutputStream os = localFs.create(path);
+    os.writeBytes(contents);
+    os.close();
+    localFs.setPermission(path, new FsPermission("700"));
+    return path;
+  }
+  
+  private Job runSpecTest(boolean mapspec, boolean redspec)
+      throws IOException, ClassNotFoundException, InterruptedException {
+
+    Path first = createTempFile("specexec_map_input1", "a\nz");
+    Path secnd = createTempFile("specexec_map_input2", "a\nz");
+
+    Configuration conf = mrCluster.getConfig();
+    conf.setBoolean(MRJobConfig.MAP_SPECULATIVE,mapspec);
+    conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE,redspec);
+    conf.setClass(MRJobConfig.MR_AM_TASK_ESTIMATOR,
+            TestSpecEstimator.class,
+            TaskRuntimeEstimator.class);
+
+    Job job = Job.getInstance(conf);
+    job.setJarByClass(TestSpeculativeExecution.class);
+    job.setMapperClass(SpeculativeMapper.class);
+    job.setReducerClass(SpeculativeReducer.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+    job.setNumReduceTasks(2);
+    FileInputFormat.setInputPaths(job, first);
+    FileInputFormat.addInputPath(job, secnd);
+    FileOutputFormat.setOutputPath(job, TEST_OUT_DIR);
+
+    // Delete output directory if it exists.
+    try {
+      localFs.delete(TEST_OUT_DIR,true);
+    } catch (IOException e) {
+      // ignore
+    }
+
+    // Creates the Job Configuration
+    job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+    job.createSymlink();
+    job.setMaxMapAttempts(2);
+
+    job.submit();
+
+    return job;
+  }
+}

+ 11 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java

@@ -205,6 +205,17 @@ public class BuilderUtils {
     return nodeId;
   }
 
+  public static ContainerStatus newContainerStatus(ContainerId containerId,
+      ContainerState containerState, String diagnostics, int exitStatus) {
+    ContainerStatus containerStatus = recordFactory
+      .newRecordInstance(ContainerStatus.class);
+    containerStatus.setState(containerState);
+    containerStatus.setContainerId(containerId);
+    containerStatus.setDiagnostics(diagnostics);
+    containerStatus.setExitStatus(exitStatus);
+    return containerStatus;
+  }
+
   public static Container newContainer(ContainerId containerId,
       NodeId nodeId, String nodeHttpAddress,
       Resource resource, Priority priority, ContainerToken containerToken) {

+ 1 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml

@@ -109,6 +109,7 @@
                 </goals>
                 <configuration>
                   <mainClass>org.apache.hadoop.yarn.util.VisualizeStateMachine</mainClass>
+		  <classpathScope>compile</classpathScope>
                   <arguments>
                     <argument>NodeManager</argument>
                     <argument>org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl,

+ 3 - 7
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java

@@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 public class ContainerImpl implements Container {
@@ -370,13 +371,8 @@ public class ContainerImpl implements Container {
   public ContainerStatus cloneAndGetContainerStatus() {
     this.readLock.lock();
     try {
-      ContainerStatus containerStatus =
-          recordFactory.newRecordInstance(ContainerStatus.class);
-      containerStatus.setState(getCurrentState());
-      containerStatus.setContainerId(this.launchContext.getContainerId());
-      containerStatus.setDiagnostics(diagnostics.toString());
-  	  containerStatus.setExitStatus(exitCode);
-      return containerStatus;
+      return BuilderUtils.newContainerStatus(this.getContainerID(),
+        getCurrentState(), diagnostics.toString(), exitCode);
     } finally {
       this.readLock.unlock();
     }

+ 4 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.webapp;
 
 import static org.apache.hadoop.yarn.util.StringHelper.pajoin;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -65,6 +66,9 @@ public class WebServer extends AbstractService {
       this.webApp =
           WebApps.$for("node", Context.class, this.nmContext, "ws")
               .at(bindAddress).with(getConfig()).start(this.nmWebApp);
+      int port = this.webApp.httpServer().getPort();
+      String webAddress = StringUtils.split(bindAddress, ':')[0] + ":" + port;
+      getConfig().set(YarnConfiguration.NM_WEBAPP_ADDRESS, webAddress);
     } catch (Exception e) {
       String msg = "NMWebapps failed to start.";
       LOG.error(msg, e);

+ 41 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java

@@ -26,6 +26,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.Writer;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -72,6 +74,45 @@ public class TestNMWebServer {
     FileUtil.fullyDelete(testRootDir);
     FileUtil.fullyDelete(testLogDir);
   }
+  
+  private String startNMWebAppServer(String webAddr) {
+    Context nmContext = new NodeManager.NMContext();
+    ResourceView resourceView = new ResourceView() {
+      @Override
+      public long getVmemAllocatedForContainers() {
+        return 0;
+      }
+      @Override
+      public long getPmemAllocatedForContainers() {
+        return 0;
+      }
+    };
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
+    conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
+    NodeHealthCheckerService healthChecker = new NodeHealthCheckerService();
+    healthChecker.init(conf);
+    LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
+    conf.set(YarnConfiguration.NM_WEBAPP_ADDRESS, webAddr);
+    WebServer server = new WebServer(nmContext, resourceView,
+        new ApplicationACLsManager(conf), dirsHandler);
+    server.init(conf);
+    server.start();
+    String webAppAddr = conf.get(YarnConfiguration.NM_WEBAPP_ADDRESS);
+    return StringUtils.split(webAppAddr, ':')[1];
+  }
+  
+  @Test
+  public void testNMWebAppWithOutPort() throws IOException {
+    String port = startNMWebAppServer("0.0.0.0");
+    Assert.assertTrue("Port is not updated", Integer.parseInt(port) > 0);
+  }
+  
+  @Test
+  public void testNMWebAppWithEphemeralPort() throws IOException {
+    String port = startNMWebAppServer("0.0.0.0:0"); 
+    Assert.assertTrue("Port is not updated", Integer.parseInt(port) > 0);
+  }
 
   @Test
   public void testNMWebApp() throws IOException {

+ 1 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml

@@ -137,6 +137,7 @@
                 </goals>
                 <configuration>
                   <mainClass>org.apache.hadoop.yarn.util.VisualizeStateMachine</mainClass>
+		  <classpathScope>compile</classpathScope>
                   <arguments>
                     <argument>ResourceManager</argument>
                     <argument>org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl,

+ 5 - 5
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -67,16 +67,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
+import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
+import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
+import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
 import org.apache.hadoop.yarn.webapp.WebApp;
 import org.apache.hadoop.yarn.webapp.WebApps;
 import org.apache.hadoop.yarn.webapp.WebApps.Builder;
-import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
-import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
-import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet;
-import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
 
 /**
  * The ResourceManager is the main class that is a set of components.
@@ -256,7 +256,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
   }
 
   @Private
-  public static final class SchedulerEventDispatcher extends AbstractService
+  public static class SchedulerEventDispatcher extends AbstractService
       implements EventHandler<SchedulerEvent> {
 
     private final ResourceScheduler scheduler;

+ 2 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -265,8 +265,8 @@ public class ResourceTrackerService extends AbstractService implements
     HeartbeatResponse latestResponse = recordFactory
         .newRecordInstance(HeartbeatResponse.class);
     latestResponse.setResponseId(lastHeartbeatResponse.getResponseId() + 1);
-    latestResponse.addAllContainersToCleanup(rmNode.pullContainersToCleanUp());
-    latestResponse.addAllApplicationsToCleanup(rmNode.pullAppsToCleanup());
+    latestResponse.addAllContainersToCleanup(rmNode.getContainersToCleanUp());
+    latestResponse.addAllApplicationsToCleanup(rmNode.getAppsToCleanup());
     latestResponse.setNodeAction(NodeAction.NORMAL);
 
     // 4. Send status to RMNode, saving the latest response.

+ 2 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java

@@ -101,9 +101,9 @@ public interface RMNode {
   
   public RMNodeState getState();
 
-  public List<ContainerId> pullContainersToCleanUp();
+  public List<ContainerId> getContainersToCleanUp();
 
-  public List<ApplicationId> pullAppsToCleanup();
+  public List<ApplicationId> getAppsToCleanup();
 
   public HeartbeatResponse getLastHeartBeatResponse();
 }

+ 27 - 30
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -89,7 +89,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   /* set of containers that have just launched */
   private final Map<ContainerId, ContainerStatus> justLaunchedContainers = 
     new HashMap<ContainerId, ContainerStatus>();
-  
 
   /* set of containers that need to be cleaned */
   private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
@@ -248,54 +247,38 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   }
 
   @Override
-  public List<ApplicationId> pullAppsToCleanup() {
-    this.writeLock.lock();
-
-    try {
-      List<ApplicationId> lastfinishedApplications = new ArrayList<ApplicationId>();
-      lastfinishedApplications.addAll(this.finishedApplications);
-      this.finishedApplications.clear();
-      return lastfinishedApplications;
-    } finally {
-      this.writeLock.unlock();
-    }
-
-  }
-
-  @Private
-  public List<ContainerId> getContainersToCleanUp() {
+  public List<ApplicationId> getAppsToCleanup() {
     this.readLock.lock();
+
     try {
-      return new ArrayList<ContainerId>(containersToClean);
+      return new ArrayList<ApplicationId>(this.finishedApplications);
     } finally {
       this.readLock.unlock();
     }
+
   }
   
   @Override
-  public List<ContainerId> pullContainersToCleanUp() {
+  public List<ContainerId> getContainersToCleanUp() {
 
-    this.writeLock.lock();
+    this.readLock.lock();
 
     try {
-      List<ContainerId> containersToCleanUp = new ArrayList<ContainerId>();
-      containersToCleanUp.addAll(this.containersToClean);
-      this.containersToClean.clear();
-      return containersToCleanUp;
+      return new ArrayList<ContainerId>(this.containersToClean);
     } finally {
-      this.writeLock.unlock();
+      this.readLock.unlock();
     }
   };
 
   @Override
   public HeartbeatResponse getLastHeartBeatResponse() {
 
-    this.writeLock.lock();
+    this.readLock.lock();
 
     try {
       return this.latestHeartBeatResponse;
     } finally {
-      this.writeLock.unlock();
+      this.readLock.unlock();
     }
   }
 
@@ -407,14 +390,22 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       for (ContainerStatus remoteContainer : statusEvent.getContainers()) {
         ContainerId containerId = remoteContainer.getContainerId();
         
-        // Don't bother with containers already scheduled for cleanup,
-        // the scheduler doens't need to know any more about this container
+        // Don't bother with containers already scheduled for cleanup, or for
+        // applications already killed. The scheduler doens't need to know any
+        // more about this container
         if (rmNode.containersToClean.contains(containerId)) {
           LOG.info("Container " + containerId + " already scheduled for " +
           		"cleanup, no further processing");
           continue;
         }
-        
+        if (rmNode.finishedApplications.contains(containerId
+            .getApplicationAttemptId().getApplicationId())) {
+          LOG.info("Container " + containerId
+              + " belongs to an application that is already killed,"
+              + " no further processing");
+          continue;
+        }
+
         // Process running containers
         if (remoteContainer.getState() == ContainerState.RUNNING) {
           if (!rmNode.justLaunchedContainers.containsKey(containerId)) {
@@ -435,6 +426,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       
       rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
           statusEvent.getKeepAliveAppIds());
+
+      // HeartBeat processing from our end is done, as node pulls the following
+      // lists before sending status-updates. Clear data-structures
+      rmNode.containersToClean.clear();
+      rmNode.finishedApplications.clear();
+
       return RMNodeState.RUNNING;
     }
   }

+ 14 - 5
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java

@@ -39,9 +39,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFini
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Multiset;
@@ -61,6 +62,7 @@ import com.google.common.collect.Multiset;
  * Each running Application in the RM corresponds to one instance
  * of this class.
  */
+@SuppressWarnings("unchecked")
 public class SchedulerApp {
 
   private static final Log LOG = LogFactory.getLog(SchedulerApp.class);
@@ -174,13 +176,20 @@ public class SchedulerApp {
     this.appSchedulingInfo.stop(rmAppAttemptFinalState);
   }
 
-  synchronized public void containerLaunchedOnNode(ContainerId containerId) {
+  public synchronized void containerLaunchedOnNode(ContainerId containerId,
+      NodeId nodeId) {
     // Inform the container
     RMContainer rmContainer = 
         getRMContainer(containerId);
-    rmContainer.handle(
-        new RMContainerEvent(containerId, 
-            RMContainerEventType.LAUNCHED));
+    if (rmContainer == null) {
+      // Some unknown container sneaked into the system. Kill it.
+      this.rmContext.getDispatcher().getEventHandler()
+        .handle(new RMNodeCleanContainerEvent(nodeId, containerId));
+      return;
+    }
+
+    rmContainer.handle(new RMContainerEvent(containerId,
+      RMContainerEventType.LAUNCHED));
   }
 
   synchronized public void containerCompleted(RMContainer rmContainer,

+ 5 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -76,6 +77,7 @@ import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 
 @LimitedPrivate("yarn")
 @Evolving
+@SuppressWarnings("unchecked")
 public class CapacityScheduler 
 implements ResourceScheduler, CapacitySchedulerContext {
 
@@ -588,10 +590,12 @@ implements ResourceScheduler, CapacitySchedulerContext {
       LOG.info("Unknown application: " + applicationAttemptId + 
           " launched container " + containerId +
           " on node: " + node);
+      this.rmContext.getDispatcher().getEventHandler()
+        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
       return;
     }
     
-    application.containerLaunchedOnNode(containerId);
+    application.containerLaunchedOnNode(containerId, node.getNodeID());
   }
 
   @Override

+ 7 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
@@ -87,6 +88,7 @@ import org.apache.hadoop.yarn.util.BuilderUtils;
 
 @LimitedPrivate("yarn")
 @Evolving
+@SuppressWarnings("unchecked")
 public class FifoScheduler implements ResourceScheduler {
 
   private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
@@ -282,7 +284,6 @@ public class FifoScheduler implements ResourceScheduler {
     return nodes.get(nodeId);
   }
   
-  @SuppressWarnings("unchecked")
   private synchronized void addApplication(ApplicationAttemptId appAttemptId,
       String user) {
     // TODO: Fix store
@@ -655,10 +656,14 @@ public class FifoScheduler implements ResourceScheduler {
       LOG.info("Unknown application: " + applicationAttemptId + 
           " launched container " + containerId +
           " on node: " + node);
+      // Some unknown container sneaked into the system. Kill it.
+      this.rmContext.getDispatcher().getEventHandler()
+        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
+
       return;
     }
     
-    application.containerLaunchedOnNode(containerId);
+    application.containerLaunchedOnNode(containerId, node.getNodeID());
   }
 
   @Lock(FifoScheduler.class)

+ 7 - 9
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java

@@ -39,15 +39,17 @@ public class MockNM {
 
   private int responseId;
   private NodeId nodeId;
-  private final String nodeIdStr;
   private final int memory;
   private final ResourceTrackerService resourceTracker;
   private final int httpPort = 2;
 
   MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
-    this.nodeIdStr = nodeIdStr;
     this.memory = memory;
     this.resourceTracker = resourceTracker;
+    String[] splits = nodeIdStr.split(":");
+    nodeId = Records.newRecord(NodeId.class);
+    nodeId.setHost(splits[0]);
+    nodeId.setPort(Integer.parseInt(splits[1]));
   }
 
   public NodeId getNodeId() {
@@ -63,14 +65,10 @@ public class MockNM {
         new HashMap<ApplicationId, List<ContainerStatus>>();
     conts.put(container.getId().getApplicationAttemptId().getApplicationId(), 
         Arrays.asList(new ContainerStatus[] { container.getContainerStatus() }));
-    nodeHeartbeat(conts, true,nodeId);
+    nodeHeartbeat(conts, true);
   }
 
   public NodeId registerNode() throws Exception {
-    String[] splits = nodeIdStr.split(":");
-    nodeId = Records.newRecord(NodeId.class);
-    nodeId.setHost(splits[0]);
-    nodeId.setPort(Integer.parseInt(splits[1]));
     RegisterNodeManagerRequest req = Records.newRecord(
         RegisterNodeManagerRequest.class);
     req.setNodeId(nodeId);
@@ -83,11 +81,11 @@ public class MockNM {
   }
 
   public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception {
-    return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(), b,nodeId);
+    return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(), b);
   }
 
   public HeartbeatResponse nodeHeartbeat(Map<ApplicationId, 
-      List<ContainerStatus>> conts, boolean isHealthy, NodeId nodeId) throws Exception {
+      List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
     NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
     NodeStatus status = Records.newRecord(NodeStatus.class);
     status.setNodeId(nodeId);

+ 2 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java

@@ -152,13 +152,13 @@ public class MockNodes {
       }
 
       @Override
-      public List<ApplicationId> pullAppsToCleanup() {
+      public List<ApplicationId> getAppsToCleanup() {
         // TODO Auto-generated method stub
         return null;
       }
 
       @Override
-      public List<ContainerId> pullContainersToCleanUp() {
+      public List<ContainerId> getContainersToCleanUp() {
         // TODO Auto-generated method stub
         return null;
       }

+ 145 - 5
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java

@@ -19,26 +19,39 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import junit.framework.Assert;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.junit.Test;
-import org.mortbay.log.Log;
 
 public class TestApplicationCleanup {
 
+  private static final Log LOG = LogFactory
+    .getLog(TestApplicationCleanup.class);
+
   @Test
   public void testAppCleanup() throws Exception {
     Logger rootLogger = LogManager.getRootLogger();
@@ -67,11 +80,13 @@ public class TestApplicationCleanup {
     List<Container> conts = am.allocate(new ArrayList<ResourceRequest>(),
         new ArrayList<ContainerId>()).getAllocatedContainers();
     int contReceived = conts.size();
-    while (contReceived < request) {
+    int waitCount = 0;
+    while (contReceived < request && waitCount++ < 20) {
       conts = am.allocate(new ArrayList<ResourceRequest>(),
           new ArrayList<ContainerId>()).getAllocatedContainers();
       contReceived += conts.size();
-      Log.info("Got " + contReceived + " containers. Waiting to get " + request);
+      LOG.info("Got " + contReceived + " containers. Waiting to get "
+               + request);
       Thread.sleep(2000);
     }
     Assert.assertEquals(request, conts.size());
@@ -86,11 +101,12 @@ public class TestApplicationCleanup {
     
     //currently only containers are cleaned via this
     //AM container is cleaned via container launcher
-    while (cleanedConts < 2 || cleanedApps < 1) {
+    waitCount = 0;
+    while ((cleanedConts < 3 || cleanedApps < 1) && waitCount++ < 20) {
       HeartbeatResponse resp = nm1.nodeHeartbeat(true);
       contsToClean = resp.getContainersToCleanupList();
       apps = resp.getApplicationsToCleanupList();
-      Log.info("Waiting to get cleanup events.. cleanedConts: "
+      LOG.info("Waiting to get cleanup events.. cleanedConts: "
           + cleanedConts + " cleanedApps: " + cleanedApps);
       cleanedConts += contsToClean.size();
       cleanedApps += apps.size();
@@ -99,6 +115,130 @@ public class TestApplicationCleanup {
     
     Assert.assertEquals(1, apps.size());
     Assert.assertEquals(app.getApplicationId(), apps.get(0));
+    Assert.assertEquals(1, cleanedApps);
+    Assert.assertEquals(3, cleanedConts);
+
+    rm.stop();
+  }
+
+  @Test
+  public void testContainerCleanup() throws Exception {
+
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+    final DrainDispatcher dispatcher = new DrainDispatcher();
+    MockRM rm = new MockRM() {
+      @Override
+      protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
+        return new SchedulerEventDispatcher(this.scheduler) {
+          @Override
+          public void handle(SchedulerEvent event) {
+            scheduler.handle(event);
+          }
+        };
+      }
+
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("h1:1234", 5000);
+
+    RMApp app = rm.submitApp(2000);
+
+    //kick the scheduling
+    nm1.nodeHeartbeat(true);
+
+    RMAppAttempt attempt = app.getCurrentAppAttempt();
+    MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+    am.registerAppAttempt();
+    
+    //request for containers
+    int request = 2;
+    am.allocate("h1" , 1000, request, 
+        new ArrayList<ContainerId>());
+    dispatcher.await();
+    
+    //kick the scheduler
+    nm1.nodeHeartbeat(true);
+    List<Container> conts = am.allocate(new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>()).getAllocatedContainers();
+    int contReceived = conts.size();
+    int waitCount = 0;
+    while (contReceived < request && waitCount++ < 20) {
+      conts = am.allocate(new ArrayList<ResourceRequest>(),
+          new ArrayList<ContainerId>()).getAllocatedContainers();
+      dispatcher.await();
+      contReceived += conts.size();
+      LOG.info("Got " + contReceived + " containers. Waiting to get "
+               + request);
+      Thread.sleep(2000);
+    }
+    Assert.assertEquals(request, conts.size());
+
+    // Release a container.
+    ArrayList<ContainerId> release = new ArrayList<ContainerId>();
+    release.add(conts.get(1).getId());
+    am.allocate(new ArrayList<ResourceRequest>(), release);
+    dispatcher.await();
+
+    // Send one more heartbeat with a fake running container. This is to
+    // simulate the situation that can happen if the NM reports that container
+    // is running in the same heartbeat when the RM asks it to clean it up.
+    Map<ApplicationId, List<ContainerStatus>> containerStatuses =
+        new HashMap<ApplicationId, List<ContainerStatus>>();
+    ArrayList<ContainerStatus> containerStatusList =
+        new ArrayList<ContainerStatus>();
+    containerStatusList.add(BuilderUtils.newContainerStatus(conts.get(1)
+      .getId(), ContainerState.RUNNING, "nothing", 0));
+    containerStatuses.put(app.getApplicationId(), containerStatusList);
+
+    HeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
+    dispatcher.await();
+    List<ContainerId> contsToClean = resp.getContainersToCleanupList();
+    int cleanedConts = contsToClean.size();
+    waitCount = 0;
+    while (cleanedConts < 1 && waitCount++ < 20) {
+      resp = nm1.nodeHeartbeat(true);
+      dispatcher.await();
+      contsToClean = resp.getContainersToCleanupList();
+      LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
+      cleanedConts += contsToClean.size();
+      Thread.sleep(1000);
+    }
+    LOG.info("Got cleanup for " + contsToClean.get(0));
+    Assert.assertEquals(1, cleanedConts);
+
+    // Now to test the case when RM already gave cleanup, and NM suddenly
+    // realizes that the container is running.
+    LOG.info("Testing container launch much after release and "
+        + "NM getting cleanup");
+    containerStatuses.clear();
+    containerStatusList.clear();
+    containerStatusList.add(BuilderUtils.newContainerStatus(conts.get(1)
+      .getId(), ContainerState.RUNNING, "nothing", 0));
+    containerStatuses.put(app.getApplicationId(), containerStatusList);
+
+    resp = nm1.nodeHeartbeat(containerStatuses, true);
+    dispatcher.await();
+    contsToClean = resp.getContainersToCleanupList();
+    cleanedConts = contsToClean.size();
+    // The cleanup list won't be instantaneous as it is given out by scheduler
+    // and not RMNodeImpl.
+    waitCount = 0;
+    while (cleanedConts < 1 && waitCount++ < 20) {
+      resp = nm1.nodeHeartbeat(true);
+      dispatcher.await();
+      contsToClean = resp.getContainersToCleanupList();
+      LOG.info("Waiting to get cleanup events.. cleanedConts: " + cleanedConts);
+      cleanedConts += contsToClean.size();
+      Thread.sleep(1000);
+    }
+    LOG.info("Got cleanup for " + contsToClean.get(0));
+    Assert.assertEquals(1, cleanedConts);
 
     rm.stop();
   }

+ 1 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

@@ -164,8 +164,7 @@ public class TestResourceTrackerService {
     Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
 
     nodeHeartbeat = nm2.nodeHeartbeat(
-        new HashMap<ApplicationId, List<ContainerStatus>>(), true,
-        recordFactory.newRecordInstance(NodeId.class));
+      new HashMap<ApplicationId, List<ContainerStatus>>(), true);
     Assert.assertTrue(NodeAction.REBOOT.equals(nodeHeartbeat.getNodeAction()));
     checkRebootedNMCount(rm, ++initialMetricCount);
   }