Browse Source

YARN-4676. Automatic and Asynchronous Decommissioning Nodes Status Tracking. Contributed by Diniel Zhi.
(cherry picked from commit d464483bf7f0b3e3be3ba32cd6c3eee546747ab5)

Conflicts:

hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

Junping Du 8 năm trước cách đây
mục cha
commit
0da69c324d
28 tập tin đã thay đổi với 1326 bổ sung219 xóa
  1. 101 10
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java
  2. 63 1
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java
  3. 1 0
      hadoop-project/src/site/site.xml
  4. 5 0
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
  5. 5 0
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
  6. 14 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  7. 25 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java
  8. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
  9. 96 70
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
  10. 11 13
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java
  11. 16 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java
  12. 18 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  13. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
  14. 439 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java
  15. 123 43
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
  16. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
  17. 19 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
  18. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
  19. 41 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java
  20. 34 20
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
  21. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
  22. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
  23. 10 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
  24. 131 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java
  25. 0 11
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
  26. 157 42
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
  27. 0 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java
  28. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md

+ 101 - 10
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/HostsFileReader.java

@@ -21,16 +21,27 @@ package org.apache.hadoop.util;
 import java.io.*;
 import java.nio.charset.StandardCharsets;
 import java.util.Set;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.Map;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
 
-import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
 
 // Keeps track of which datanodes/tasktrackers are allowed to connect to the 
 // namenode/jobtracker.
@@ -38,7 +49,9 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceStability.Unstable
 public class HostsFileReader {
   private Set<String> includes;
-  private Set<String> excludes;
+  // exclude host list with optional timeout.
+  // If the value is null, it indicates default timeout.
+  private Map<String, Integer> excludes;
   private String includesFile;
   private String excludesFile;
   private WriteLock writeLock;
@@ -49,7 +62,7 @@ public class HostsFileReader {
   public HostsFileReader(String inFile, 
                          String exFile) throws IOException {
     includes = new HashSet<String>();
-    excludes = new HashSet<String>();
+    excludes = new HashMap<String, Integer>();
     includesFile = inFile;
     excludesFile = exFile;
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
@@ -62,7 +75,7 @@ public class HostsFileReader {
   public HostsFileReader(String includesFile, InputStream inFileInputStream,
       String excludesFile, InputStream exFileInputStream) throws IOException {
     includes = new HashSet<String>();
-    excludes = new HashSet<String>();
+    excludes = new HashMap<String, Integer>();
     this.includesFile = includesFile;
     this.excludesFile = excludesFile;
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
@@ -121,6 +134,73 @@ public class HostsFileReader {
     }
   }
 
+  public static void readFileToMap(String type,
+      String filename, Map<String, Integer> map) throws IOException {
+    File file = new File(filename);
+    FileInputStream fis = new FileInputStream(file);
+    readFileToMapWithFileInputStream(type, filename, fis, map);
+  }
+
+  public static void readFileToMapWithFileInputStream(String type,
+      String filename, InputStream inputStream, Map<String, Integer> map)
+          throws IOException {
+    // The input file could be either simple text or XML.
+    boolean xmlInput = filename.toLowerCase().endsWith(".xml");
+    if (xmlInput) {
+      readXmlFileToMapWithFileInputStream(type, filename, inputStream, map);
+    } else {
+      HashSet<String> nodes = new HashSet<String>();
+      readFileToSetWithFileInputStream(type, filename, inputStream, nodes);
+      for (String node : nodes) {
+        map.put(node, null);
+      }
+    }
+  }
+
+  public static void readXmlFileToMapWithFileInputStream(String type,
+      String filename, InputStream fileInputStream, Map<String, Integer> map)
+          throws IOException {
+    Document dom;
+    DocumentBuilderFactory builder = DocumentBuilderFactory.newInstance();
+    try {
+      DocumentBuilder db = builder.newDocumentBuilder();
+      dom = db.parse(fileInputStream);
+      // Examples:
+      // <host><name>host1</name></host>
+      // <host><name>host2</name><timeout>123</timeout></host>
+      // <host><name>host3</name><timeout>-1</timeout></host>
+      // <host><name>host4, host5,host6</name><timeout>1800</timeout></host>
+      Element doc = dom.getDocumentElement();
+      NodeList nodes = doc.getElementsByTagName("host");
+      for (int i = 0; i < nodes.getLength(); i++) {
+        Node node = nodes.item(i);
+        if (node.getNodeType() == Node.ELEMENT_NODE) {
+          Element e= (Element) node;
+          // Support both single host and comma-separated list of hosts.
+          String v = readFirstTagValue(e, "name");
+          String[] hosts = StringUtils.getTrimmedStrings(v);
+          String str = readFirstTagValue(e, "timeout");
+          Integer timeout = (str == null)? null : Integer.parseInt(str);
+          for (String host : hosts) {
+            map.put(host, timeout);
+            LOG.info("Adding a node \"" + host + "\" to the list of "
+                + type + " hosts from " + filename);
+          }
+        }
+      }
+    } catch (IOException|SAXException|ParserConfigurationException e) {
+      LOG.fatal("error parsing " + filename, e);
+      throw new RuntimeException(e);
+    } finally {
+      fileInputStream.close();
+    }
+  }
+
+  static String readFirstTagValue(Element e, String tag) {
+    NodeList nodes = e.getElementsByTagName(tag);
+    return (nodes.getLength() == 0)? null : nodes.item(0).getTextContent();
+  }
+
   public void refresh(String includeFiles, String excludeFiles)
       throws IOException {
     LOG.info("Refreshing hosts (include/exclude) list");
@@ -129,7 +209,7 @@ public class HostsFileReader {
       // update instance variables
       updateFileNames(includeFiles, excludeFiles);
       Set<String> newIncludes = new HashSet<String>();
-      Set<String> newExcludes = new HashSet<String>();
+      Map<String, Integer> newExcludes = new HashMap<String, Integer>();
       boolean switchIncludes = false;
       boolean switchExcludes = false;
       if (includeFiles != null && !includeFiles.isEmpty()) {
@@ -137,7 +217,7 @@ public class HostsFileReader {
         switchIncludes = true;
       }
       if (excludeFiles != null && !excludeFiles.isEmpty()) {
-        readFileToSet("excluded", excludeFiles, newExcludes);
+        readFileToMap("excluded", excludeFiles, newExcludes);
         switchExcludes = true;
       }
 
@@ -161,7 +241,7 @@ public class HostsFileReader {
     this.writeLock.lock();
     try {
       Set<String> newIncludes = new HashSet<String>();
-      Set<String> newExcludes = new HashSet<String>();
+      Map<String, Integer> newExcludes = new HashMap<String, Integer>();
       boolean switchIncludes = false;
       boolean switchExcludes = false;
       if (inFileInputStream != null) {
@@ -170,7 +250,7 @@ public class HostsFileReader {
         switchIncludes = true;
       }
       if (exFileInputStream != null) {
-        readFileToSetWithFileInputStream("excluded", excludesFile,
+        readFileToMapWithFileInputStream("excluded", excludesFile,
             exFileInputStream, newExcludes);
         switchExcludes = true;
       }
@@ -199,7 +279,7 @@ public class HostsFileReader {
   public Set<String> getExcludedHosts() {
     this.readLock.lock();
     try {
-      return excludes;
+      return excludes.keySet();
     } finally {
       this.readLock.unlock();
     }
@@ -209,7 +289,18 @@ public class HostsFileReader {
     this.readLock.lock();
     try {
       includes.addAll(this.includes);
-      excludes.addAll(this.excludes);
+      excludes.addAll(this.excludes.keySet());
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  public void getHostDetails(Set<String> includeHosts,
+                             Map<String, Integer> excludeHosts) {
+    this.readLock.lock();
+    try {
+      includeHosts.addAll(this.includes);
+      excludeHosts.putAll(this.excludes);
     } finally {
       this.readLock.unlock();
     }

+ 63 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestHostsFileReader.java

@@ -20,16 +20,19 @@ package org.apache.hadoop.util;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileWriter;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.Map;
 
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.*;
+
 import static org.junit.Assert.*;
 
 /*
  * Test for HostsFileReader.java
- * 
+ *
  */
 public class TestHostsFileReader {
 
@@ -39,6 +42,7 @@ public class TestHostsFileReader {
   File INCLUDES_FILE = new File(HOSTS_TEST_DIR, "dfs.include");
   String excludesFile = HOSTS_TEST_DIR + "/dfs.exclude";
   String includesFile = HOSTS_TEST_DIR + "/dfs.include";
+  private String excludesXmlFile = HOSTS_TEST_DIR + "/dfs.exclude.xml";
 
   @Before
   public void setUp() throws Exception {
@@ -288,4 +292,62 @@ public class TestHostsFileReader {
     assertFalse(hfp.getExcludedHosts().contains("somehost5"));
 
   }
+
+  /*
+   * Test if timeout values are provided in HostFile
+   */
+  @Test
+  public void testHostFileReaderWithTimeout() throws Exception {
+    FileWriter efw = new FileWriter(excludesXmlFile);
+    FileWriter ifw = new FileWriter(includesFile);
+
+    efw.write("<?xml version=\"1.0\"?>\n");
+    efw.write("<!-- yarn.nodes.exclude -->\n");
+    efw.write("<hosts>\n");
+    efw.write("<host><name>host1</name></host>\n");
+    efw.write("<host><name>host2</name><timeout>123</timeout></host>\n");
+    efw.write("<host><name>host3</name><timeout>-1</timeout></host>\n");
+    efw.write("<host><name>10000</name></host>\n");
+    efw.write("<host><name>10001</name><timeout>123</timeout></host>\n");
+    efw.write("<host><name>10002</name><timeout>-1</timeout></host>\n");
+    efw.write("<host><name>host4,host5, host6</name>" +
+              "<timeout>1800</timeout></host>\n");
+    efw.write("</hosts>\n");
+    efw.close();
+
+    ifw.write("#Hosts-in-DFS\n");
+    ifw.write("     \n");
+    ifw.write("   somehost \t  somehost2 \n somehost4");
+    ifw.write("   somehost3 \t # somehost5");
+    ifw.close();
+
+    HostsFileReader hfp = new HostsFileReader(includesFile, excludesXmlFile);
+
+    int includesLen = hfp.getHosts().size();
+    int excludesLen = hfp.getExcludedHosts().size();
+    assertEquals(4, includesLen);
+    assertEquals(9, excludesLen);
+
+    Set<String> includes = new HashSet<String>();
+    Map<String, Integer> excludes = new HashMap<String, Integer>();
+    hfp.getHostDetails(includes, excludes);
+    assertTrue(excludes.containsKey("host1"));
+    assertTrue(excludes.containsKey("host2"));
+    assertTrue(excludes.containsKey("host3"));
+    assertTrue(excludes.containsKey("10000"));
+    assertTrue(excludes.containsKey("10001"));
+    assertTrue(excludes.containsKey("10002"));
+    assertTrue(excludes.containsKey("host4"));
+    assertTrue(excludes.containsKey("host5"));
+    assertTrue(excludes.containsKey("host6"));
+    assertTrue(excludes.get("host1") == null);
+    assertTrue(excludes.get("host2") == 123);
+    assertTrue(excludes.get("host3") == -1);
+    assertTrue(excludes.get("10000") == null);
+    assertTrue(excludes.get("10001") == 123);
+    assertTrue(excludes.get("10002") == -1);
+    assertTrue(excludes.get("host4") == 1800);
+    assertTrue(excludes.get("host5") == 1800);
+    assertTrue(excludes.get("host6") == 1800);
+  }
 }

+ 1 - 0
hadoop-project/src/site/site.xml

@@ -136,6 +136,7 @@
       <item name="Secure Containers" href="hadoop-yarn/hadoop-yarn-site/SecureContainer.html"/>
       <item name="Registry" href="hadoop-yarn/hadoop-yarn-site/registry/index.html"/>
       <item name="Reservation System" href="hadoop-yarn/hadoop-yarn-site/ReservationSystem.html"/>
+      <item name="Graceful Decommission" href="hadoop-yarn/hadoop-yarn-site/GracefulDecommission.html"/>
     </menu>
 
     <menu name="YARN REST APIs" inherit="top">

+ 5 - 0
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java

@@ -213,6 +213,11 @@ public class NodeInfo {
     @Override
     public void setUntrackedTimeStamp(long timeStamp) {
     }
+
+    @Override
+    public Integer getDecommissioningTimeout() {
+      return null;
+    }
   }
 
   public static RMNode newNodeInfo(String rackName, String hostName,

+ 5 - 0
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java

@@ -202,4 +202,9 @@ public class RMNodeWrapper implements RMNode {
   @Override
   public void setUntrackedTimeStamp(long timeStamp) {
   }
+
+  @Override
+  public Integer getDecommissioningTimeout() {
+    return null;
+  }
 }

+ 14 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -795,6 +795,20 @@ public class YarnConfiguration extends Configuration {
    */
   public static final String RM_PROXY_USER_PREFIX = RM_PREFIX + "proxyuser.";
 
+  /**
+   * Timeout in seconds for YARN node graceful decommission.
+   * This is the maximal time to wait for running containers and applications
+   * to complete before transition a DECOMMISSIONING node into DECOMMISSIONED.
+   */
+  public static final String RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT =
+      RM_PREFIX + "nodemanager-graceful-decommission-timeout-secs";
+  public static final int DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT = 3600;
+
+  public static final String RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL =
+      RM_PREFIX + "decommissioning-nodes-watcher.poll-interval-secs";
+  public static final int
+      DEFAULT_RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL = 20;
+
   ////////////////////////////////
   // Node Manager Configs
   ////////////////////////////////

+ 25 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java

@@ -43,6 +43,16 @@ public abstract class RefreshNodesRequest {
     return request;
   }
 
+  @Private
+  @Unstable
+  public static RefreshNodesRequest newInstance(
+      DecommissionType decommissionType, Integer timeout) {
+    RefreshNodesRequest request = Records.newRecord(RefreshNodesRequest.class);
+    request.setDecommissionType(decommissionType);
+    request.setDecommissionTimeout(timeout);
+    return request;
+  }
+
   /**
    * Set the DecommissionType
    * 
@@ -56,4 +66,18 @@ public abstract class RefreshNodesRequest {
    * @return decommissionType
    */
   public abstract DecommissionType getDecommissionType();
-}
+
+  /**
+   * Set the DecommissionTimeout.
+   *
+   * @param timeout graceful decommission timeout in seconds
+   */
+  public abstract void setDecommissionTimeout(Integer timeout);
+
+  /**
+   * Get the DecommissionTimeout.
+   *
+   * @return decommissionTimeout
+   */
+  public abstract Integer getDecommissionTimeout();
+}

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto

@@ -37,6 +37,7 @@ message RefreshQueuesResponseProto {
 
 message RefreshNodesRequestProto {
   optional DecommissionTypeProto decommissionType = 1 [default = NORMAL];
+  optional int32 decommissionTimeout = 2;
 }
 message RefreshNodesResponseProto {
 }

+ 96 - 70
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java

@@ -99,10 +99,10 @@ public class RMAdminCLI extends HAAdmin {
                   "properties. \n\t\tResourceManager will reload the " +
                   "mapred-queues configuration file."))
           .put("-refreshNodes",
-              new UsageInfo("[-g [timeout in seconds] -client|server]",
+              new UsageInfo("[-g|graceful [timeout in seconds] -client|server]",
               "Refresh the hosts information at the ResourceManager. Here "
-              + "[-g [timeout in seconds] -client|server] is optional, if we "
-              + "specify the timeout then ResourceManager will wait for "
+              + "[-g|graceful [timeout in seconds] -client|server] is optional,"
+              + " if we specify the timeout then ResourceManager will wait for "
               + "timeout before marking the NodeManager as decommissioned."
               + " The -client|server indicates if the timeout tracking should"
               + " be handled by the client or the ResourceManager. The client"
@@ -234,21 +234,23 @@ public class RMAdminCLI extends HAAdmin {
     summary.append("rmadmin is the command to execute YARN administrative " +
         "commands.\n");
     summary.append("The full syntax is: \n\n" +
-    "yarn rmadmin" +
-      " [-refreshQueues]" +
-      " [-refreshNodes [-g [timeout in seconds] -client|server]]" +
-      " [-refreshNodesResources]" +
-      " [-refreshSuperUserGroupsConfiguration]" +
-      " [-refreshUserToGroupsMappings]" +
-      " [-refreshAdminAcls]" +
-      " [-refreshServiceAcl]" +
-      " [-getGroup [username]]" +
-      " [-addToClusterNodeLabels <\"label1(exclusive=true),"
-                  + "label2(exclusive=false),label3\">]" +
-      " [-removeFromClusterNodeLabels <label1,label2,label3>]" +
-      " [-replaceLabelsOnNode <\"node1[:port]=label1,label2 node2[:port]=label1\">]" +
-      " [-directlyAccessNodeLabelStore]" +
-      " [-updateNodeResource [NodeID] [MemSize] [vCores] ([OvercommitTimeout])");
+        "yarn rmadmin" +
+        " [-refreshQueues]" +
+        " [-refreshNodes [-g|graceful [timeout in seconds] -client|server]]" +
+        " [-refreshNodesResources]" +
+        " [-refreshSuperUserGroupsConfiguration]" +
+        " [-refreshUserToGroupsMappings]" +
+        " [-refreshAdminAcls]" +
+        " [-refreshServiceAcl]" +
+        " [-getGroup [username]]" +
+        " [-addToClusterNodeLabels <\"label1(exclusive=true),"
+            + "label2(exclusive=false),label3\">]" +
+        " [-removeFromClusterNodeLabels <label1,label2,label3>]" +
+        " [-replaceLabelsOnNode <\"node1[:port]=label1,label2" +
+        " node2[:port]=label1\">]" +
+        " [-directlyAccessNodeLabelStore]" +
+        " [-updateNodeResource [NodeID] [MemSize] [vCores]" +
+        " ([OvercommitTimeout])");
     if (isHAEnabled) {
       appendHAUsage(summary);
     }
@@ -309,33 +311,40 @@ public class RMAdminCLI extends HAAdmin {
     return 0;
   }
 
-  private int refreshNodes() throws IOException, YarnException {
+  private int refreshNodes(boolean graceful) throws IOException, YarnException {
     // Refresh the nodes
     ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
-    RefreshNodesRequest request = RefreshNodesRequest
-        .newInstance(DecommissionType.NORMAL);
+    RefreshNodesRequest request = RefreshNodesRequest.newInstance(
+        graceful? DecommissionType.GRACEFUL : DecommissionType.NORMAL);
     adminProtocol.refreshNodes(request);
     return 0;
   }
 
-  private int refreshNodes(long timeout, String trackingMode)
+  private int refreshNodes(int timeout, String trackingMode)
       throws IOException, YarnException {
-    if (!"client".equals(trackingMode)) {
-      throw new UnsupportedOperationException(
-          "Only client tracking mode is currently supported.");
-    }
+    boolean serverTracking = !"client".equals(trackingMode);
     // Graceful decommissioning with timeout
     ResourceManagerAdministrationProtocol adminProtocol = createAdminProtocol();
     RefreshNodesRequest gracefulRequest = RefreshNodesRequest
-        .newInstance(DecommissionType.GRACEFUL);
+        .newInstance(DecommissionType.GRACEFUL, timeout);
     adminProtocol.refreshNodes(gracefulRequest);
+    if (serverTracking) {
+      return 0;
+    }
     CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest = recordFactory
         .newRecordInstance(CheckForDecommissioningNodesRequest.class);
     long waitingTime;
     boolean nodesDecommissioning = true;
+    // As RM enforces timeout automatically, client usually don't need
+    // to forcefully decommission nodes upon timeout.
+    // Here we let the client waits a small additional seconds so to avoid
+    // unnecessary double decommission.
+    final int gracePeriod = 5;
     // timeout=-1 means wait for all the nodes to be gracefully
     // decommissioned
-    for (waitingTime = 0; waitingTime < timeout || timeout == -1; waitingTime++) {
+    for (waitingTime = 0;
+        timeout == -1 || (timeout >= 0 && waitingTime < timeout + gracePeriod);
+        waitingTime++) {
       // wait for one second to check nodes decommissioning status
       try {
         Thread.sleep(1000);
@@ -380,6 +389,10 @@ public class RMAdminCLI extends HAAdmin {
     return 0;
   }
 
+  private int refreshNodes() throws IOException, YarnException {
+    return refreshNodes(false);
+  }
+
   private int refreshUserToGroupsMappings() throws IOException,
       YarnException {
     // Refresh the user-to-groups mappings
@@ -725,33 +738,12 @@ public class RMAdminCLI extends HAAdmin {
         return exitCode;
       }
     }
-    
+
     try {
       if ("-refreshQueues".equals(cmd)) {
         exitCode = refreshQueues();
       } else if ("-refreshNodes".equals(cmd)) {
-        if (args.length == 1) {
-          exitCode = refreshNodes();
-        } else if (args.length == 3 || args.length == 4) {
-          // if the graceful timeout specified
-          if ("-g".equals(args[1])) {
-            long timeout = -1;
-            String trackingMode;
-            if (args.length == 4) {
-              timeout = validateTimeout(args[2]);
-              trackingMode = validateTrackingMode(args[3]);
-            } else {
-              trackingMode = validateTrackingMode(args[2]);
-            }
-            exitCode = refreshNodes(timeout, trackingMode);
-          } else {
-            printUsage(cmd, isHAEnabled);
-            return -1;
-          }
-        } else {
-          printUsage(cmd, isHAEnabled);
-          return -1;
-        }
+        exitCode = handleRefreshNodes(args, cmd, isHAEnabled);
       } else if ("-refreshNodesResources".equals(cmd)) {
         exitCode = refreshNodesResources();
       } else if ("-refreshUserToGroupsMappings".equals(cmd)) {
@@ -768,22 +760,7 @@ public class RMAdminCLI extends HAAdmin {
         String[] usernames = Arrays.copyOfRange(args, i, args.length);
         exitCode = getGroups(usernames);
       } else if ("-updateNodeResource".equals(cmd)) {
-        if (args.length < 4 || args.length > 5) {
-          System.err.println("Number of parameters specified for " +
-              "updateNodeResource is wrong.");
-          printUsage(cmd, isHAEnabled);
-          exitCode = -1;
-        } else {
-          String nodeID = args[i++];
-          String memSize = args[i++];
-          String cores = args[i++];
-          int overCommitTimeout = ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT;
-          if (i == args.length - 1) {
-            overCommitTimeout = Integer.parseInt(args[i]);
-          }
-          exitCode = updateNodeResource(nodeID, Integer.parseInt(memSize),
-              Integer.parseInt(cores), overCommitTimeout);
-        }
+        exitCode = handleUpdateNodeResource(args, cmd, isHAEnabled);
       } else if ("-addToClusterNodeLabels".equals(cmd)) {
         if (i >= args.length) {
           System.err.println(NO_LABEL_ERR_MSG);
@@ -843,10 +820,59 @@ public class RMAdminCLI extends HAAdmin {
     return exitCode;
   }
 
-  private long validateTimeout(String strTimeout) {
-    long timeout;
+  // A helper method to reduce the number of lines of run()
+  private int handleRefreshNodes(String[] args, String cmd, boolean isHAEnabled)
+      throws IOException, YarnException {
+    if (args.length == 1) {
+      return refreshNodes();
+    } else if (args.length == 3 || args.length == 4) {
+      // if the graceful timeout specified
+      if ("-g".equals(args[1]) || "-graceful".equals(args[1])) {
+        int timeout = -1;
+        String trackingMode;
+        if (args.length == 4) {
+          timeout = validateTimeout(args[2]);
+          trackingMode = validateTrackingMode(args[3]);
+        } else {
+          trackingMode = validateTrackingMode(args[2]);
+        }
+        return refreshNodes(timeout, trackingMode);
+      } else {
+        printUsage(cmd, isHAEnabled);
+        return -1;
+      }
+    } else {
+      printUsage(cmd, isHAEnabled);
+      return -1;
+    }
+  }
+
+  private int handleUpdateNodeResource(
+      String[] args, String cmd, boolean isHAEnabled)
+          throws NumberFormatException, IOException, YarnException {
+    int i = 1;
+    if (args.length < 4 || args.length > 5) {
+      System.err.println("Number of parameters specified for " +
+          "updateNodeResource is wrong.");
+      printUsage(cmd, isHAEnabled);
+      return -1;
+    } else {
+      String nodeID = args[i++];
+      String memSize = args[i++];
+      String cores = args[i++];
+      int overCommitTimeout = ResourceOption.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT;
+      if (i == args.length - 1) {
+        overCommitTimeout = Integer.parseInt(args[i]);
+      }
+      return updateNodeResource(nodeID, Integer.parseInt(memSize),
+          Integer.parseInt(cores), overCommitTimeout);
+    }
+  }
+
+  private int validateTimeout(String strTimeout) {
+    int timeout;
     try {
-      timeout = Long.parseLong(strTimeout);
+      timeout = Integer.parseInt(strTimeout);
     } catch (NumberFormatException ex) {
       throw new IllegalArgumentException(INVALID_TIMEOUT_ERR_MSG + strTimeout);
     }

+ 11 - 13
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java

@@ -267,7 +267,7 @@ public class TestRMAdminCLI {
         CheckForDecommissioningNodesRequest.class))).thenReturn(response);
     assertEquals(0, rmAdminCLI.run(args));
     verify(admin).refreshNodes(
-        RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL));
+        RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL, 1));
     verify(admin, never()).refreshNodes(
         RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
   }
@@ -327,7 +327,7 @@ public class TestRMAdminCLI {
           });
     assertEquals(0, rmAdminCLI.run(args));
     verify(admin, atLeastOnce()).refreshNodes(
-        RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL));
+        RefreshNodesRequest.newInstance(DecommissionType.GRACEFUL, -1));
     verify(admin, never()).refreshNodes(
         RefreshNodesRequest.newInstance(DecommissionType.FORCEFUL));
   }
@@ -346,10 +346,6 @@ public class TestRMAdminCLI {
     String[] negativeTimeoutArgs = {"-refreshNodes", "-g", "-1000", "-client"};
     assertEquals(-1, rmAdminCLI.run(negativeTimeoutArgs));
 
-    // server tracking mode
-    String[] serveTrackingrArgs = {"-refreshNodes", "-g", "1", "-server"};
-    assertEquals(-1, rmAdminCLI.run(serveTrackingrArgs));
-
     // invalid tracking mode
     String[] invalidTrackingArgs = {"-refreshNodes", "-g", "1", "-foo"};
     assertEquals(-1, rmAdminCLI.run(invalidTrackingArgs));
@@ -465,8 +461,9 @@ public class TestRMAdminCLI {
       assertTrue(dataOut
           .toString()
           .contains(
-              "yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in " +
-              "seconds] -client|server]] [-refreshNodesResources] [-refresh" +
+              "yarn rmadmin [-refreshQueues] [-refreshNodes "+
+              "[-g|graceful [timeout in seconds] -client|server]] " +
+              "[-refreshNodesResources] [-refresh" +
               "SuperUserGroupsConfiguration] [-refreshUserToGroupsMappings] " +
               "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup " +
               "[username]] [-addToClusterNodeLabels " +
@@ -485,7 +482,8 @@ public class TestRMAdminCLI {
       assertTrue(dataOut
           .toString()
           .contains(
-              "-refreshNodes [-g [timeout in seconds] -client|server]: " +
+              "-refreshNodes [-g|graceful [timeout in seconds]" +
+              " -client|server]: " +
               "Refresh the hosts information at the ResourceManager."));
       assertTrue(dataOut
           .toString()
@@ -518,8 +516,8 @@ public class TestRMAdminCLI {
       testError(new String[] { "-help", "-refreshQueues" },
           "Usage: yarn rmadmin [-refreshQueues]", dataErr, 0);
       testError(new String[] { "-help", "-refreshNodes" },
-          "Usage: yarn rmadmin [-refreshNodes [-g [timeout in seconds] " +
-          "-client|server]]", dataErr, 0);
+          "Usage: yarn rmadmin [-refreshNodes [-g|graceful " +
+          "[timeout in seconds] -client|server]]", dataErr, 0);
       testError(new String[] { "-help", "-refreshNodesResources" },
           "Usage: yarn rmadmin [-refreshNodesResources]", dataErr, 0);
       testError(new String[] { "-help", "-refreshUserToGroupsMappings" },
@@ -558,8 +556,8 @@ public class TestRMAdminCLI {
       assertEquals(0, rmAdminCLIWithHAEnabled.run(args));
       oldOutPrintStream.println(dataOut);
       String expectedHelpMsg = 
-          "yarn rmadmin [-refreshQueues] [-refreshNodes [-g [timeout in "
-              + "seconds] -client|server]] "
+          "yarn rmadmin [-refreshQueues] [-refreshNodes [-g|graceful "
+              + "[timeout in seconds] -client|server]] "
               + "[-refreshNodesResources] [-refreshSuperUserGroupsConfiguration] "
               + "[-refreshUserToGroupsMappings] "
               + "[-refreshAdminAcls] [-refreshServiceAcl] [-getGroup"

+ 16 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java

@@ -31,7 +31,6 @@ import com.google.protobuf.TextFormat;
 @Private
 @Unstable
 public class RefreshNodesRequestPBImpl extends RefreshNodesRequest {
-
   RefreshNodesRequestProto proto = RefreshNodesRequestProto.getDefaultInstance();
   RefreshNodesRequestProto.Builder builder = null;
   boolean viaProto = false;
@@ -108,6 +107,22 @@ public class RefreshNodesRequestPBImpl extends RefreshNodesRequest {
     return convertFromProtoFormat(p.getDecommissionType());
   }
 
+  @Override
+  public synchronized void setDecommissionTimeout(Integer timeout) {
+    maybeInitBuilder();
+    if (timeout != null) {
+      builder.setDecommissionTimeout(timeout);
+    } else {
+      builder.clearDecommissionTimeout();
+    }
+  }
+
+  @Override
+  public synchronized Integer getDecommissionTimeout() {
+    RefreshNodesRequestProtoOrBuilder p = viaProto ? proto : builder;
+    return p.hasDecommissionTimeout()? p.getDecommissionTimeout() : null;
+  }
+
   private DecommissionType convertFromProtoFormat(DecommissionTypeProto p) {
     return DecommissionType.valueOf(p.name());
   }

+ 18 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -2585,6 +2585,24 @@
     <value>1800000</value>
   </property>
 
+  <property>
+    <description>
+    Timeout in seconds for YARN node graceful decommission.
+    This is the maximal time to wait for running containers and applications to complete
+    before transition a DECOMMISSIONING node into DECOMMISSIONED.
+    </description>
+    <name>yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs</name>
+    <value>3600</value>
+  </property>
+
+  <property>
+    <description>
+    Timeout in seconds of DecommissioningNodesWatcher internal polling.
+    </description>
+    <name>yarn.resourcemanager.decommissioning-nodes-watcher.poll-interval-secs</name>
+    <value>20</value>
+  </property>
+
   <property>
     <description>The Node Label script to run. Script output Line starting with
      "NODE_PARTITION:" will be considered as Node Label Partition. In case of

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java

@@ -447,7 +447,8 @@ public class AdminService extends CompositeService implements
         rmContext.getNodesListManager().refreshNodes(conf);
         break;
       case GRACEFUL:
-        rmContext.getNodesListManager().refreshNodesGracefully(conf);
+        rmContext.getNodesListManager().refreshNodesGracefully(
+            conf, request.getDecommissionTimeout());
         break;
       case FORCEFUL:
         rmContext.getNodesListManager().refreshNodesForcefully();

+ 439 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java

@@ -0,0 +1,439 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.util.MonotonicClock;
+
+/**
+ * DecommissioningNodesWatcher is used by ResourceTrackerService to track
+ * DECOMMISSIONING nodes to decide when, after all running containers on
+ * the node have completed, will be transitioned into DECOMMISSIONED state
+ * (NodeManager will be told to shutdown).
+ * Under MR application, a node, after completes all its containers,
+ * may still serve it map output data during the duration of the application
+ * for reducers. A fully graceful mechanism would keep such DECOMMISSIONING
+ * nodes until all involved applications complete. It could be however
+ * undesirable under long-running applications scenario where a bunch
+ * of "idle" nodes would stay around for long period of time.
+ *
+ * DecommissioningNodesWatcher balance such concern with a timeout policy ---
+ * a DECOMMISSIONING node will be DECOMMISSIONED no later than
+ * DECOMMISSIONING_TIMEOUT regardless of running containers or applications.
+ *
+ * To be efficient, DecommissioningNodesWatcher skip tracking application
+ * containers on a particular node before the node is in DECOMMISSIONING state.
+ * It only tracks containers once the node is in DECOMMISSIONING state.
+ * DecommissioningNodesWatcher basically is no cost when no node is
+ * DECOMMISSIONING. This sacrifices the possibility that the node once
+ * host containers of an application that is still running
+ * (the affected map tasks will be rescheduled).
+ */
+public class DecommissioningNodesWatcher {
+  private static final Log LOG =
+      LogFactory.getLog(DecommissioningNodesWatcher.class);
+
+  private final RMContext rmContext;
+
+  // Default timeout value in mills.
+  // Negative value indicates no timeout. 0 means immediate.
+  private long defaultTimeoutMs =
+      1000L * YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT;
+
+  // Once a RMNode is observed in DECOMMISSIONING state,
+  // All its ContainerStatus update are tracked inside DecomNodeContext.
+  class DecommissioningNodeContext {
+    private final NodeId nodeId;
+
+    // Last known NodeState.
+    private NodeState nodeState;
+
+    // The moment node is observed in DECOMMISSIONING state.
+    private final long decommissioningStartTime;
+
+    private long lastContainerFinishTime;
+
+    // number of running containers at the moment.
+    private int numActiveContainers;
+
+    // All applications run on the node at or after decommissioningStartTime.
+    private Set<ApplicationId> appIds;
+
+    // First moment the node is observed in DECOMMISSIONED state.
+    private long decommissionedTime;
+
+    // Timeout in millis for this decommissioning node.
+    // This value could be dynamically updated with new value from RMNode.
+    private long timeoutMs;
+
+    private long lastUpdateTime;
+
+    public DecommissioningNodeContext(NodeId nodeId) {
+      this.nodeId = nodeId;
+      this.appIds = new HashSet<ApplicationId>();
+      this.decommissioningStartTime = mclock.getTime();
+      this.timeoutMs = defaultTimeoutMs;
+    }
+
+    void updateTimeout(Integer timeoutSec) {
+      this.timeoutMs = (timeoutSec == null)?
+          defaultTimeoutMs : (1000L * timeoutSec);
+    }
+  }
+
+  // All DECOMMISSIONING nodes to track.
+  private HashMap<NodeId, DecommissioningNodeContext> decomNodes =
+      new HashMap<NodeId, DecommissioningNodeContext>();
+
+  private Timer pollTimer;
+  private MonotonicClock mclock;
+
+  public DecommissioningNodesWatcher(RMContext rmContext) {
+    this.rmContext = rmContext;
+    pollTimer = new Timer(true);
+    mclock = new MonotonicClock();
+  }
+
+  public void init(Configuration conf) {
+    readDecommissioningTimeout(conf);
+    int v = conf.getInt(
+        YarnConfiguration.RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL,
+        YarnConfiguration
+          .DEFAULT_RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL);
+    pollTimer.schedule(new PollTimerTask(rmContext), 0, (1000L * v));
+  }
+
+  /**
+   * Update rmNode decommissioning status based on NodeStatus.
+   * @param rmNode The node
+   * @param remoteNodeStatus latest NodeStatus
+   */
+  public synchronized void update(RMNode rmNode, NodeStatus remoteNodeStatus) {
+    DecommissioningNodeContext context = decomNodes.get(rmNode.getNodeID());
+    long now = mclock.getTime();
+    if (rmNode.getState() == NodeState.DECOMMISSIONED) {
+      if (context == null) {
+        return;
+      }
+      context.nodeState = rmNode.getState();
+      // keep DECOMMISSIONED node for a while for status log, so that such
+      // host will appear as DECOMMISSIONED instead of quietly disappears.
+      if (context.decommissionedTime == 0) {
+        context.decommissionedTime = now;
+      } else if (now - context.decommissionedTime > 60000L) {
+        decomNodes.remove(rmNode.getNodeID());
+      }
+    } else if (rmNode.getState() == NodeState.DECOMMISSIONING) {
+      if (context == null) {
+        context = new DecommissioningNodeContext(rmNode.getNodeID());
+        decomNodes.put(rmNode.getNodeID(), context);
+        context.nodeState = rmNode.getState();
+        context.decommissionedTime = 0;
+      }
+      context.updateTimeout(rmNode.getDecommissioningTimeout());
+      context.lastUpdateTime = now;
+
+      if (remoteNodeStatus.getKeepAliveApplications() != null) {
+        context.appIds.addAll(remoteNodeStatus.getKeepAliveApplications());
+      }
+
+      // Count number of active containers.
+      int numActiveContainers = 0;
+      for (ContainerStatus cs : remoteNodeStatus.getContainersStatuses()) {
+        ContainerState newState = cs.getState();
+        if (newState == ContainerState.RUNNING ||
+            newState == ContainerState.NEW) {
+          numActiveContainers++;
+        }
+        context.numActiveContainers = numActiveContainers;
+        ApplicationId aid = cs.getContainerId()
+            .getApplicationAttemptId().getApplicationId();
+        if (!context.appIds.contains(aid)) {
+          context.appIds.add(aid);
+        }
+      }
+
+      context.numActiveContainers = numActiveContainers;
+
+      // maintain lastContainerFinishTime.
+      if (context.numActiveContainers == 0 &&
+          context.lastContainerFinishTime == 0) {
+        context.lastContainerFinishTime = now;
+      }
+    } else {
+      // remove node in other states
+      if (context != null) {
+        decomNodes.remove(rmNode.getNodeID());
+      }
+    }
+  }
+
+  public synchronized void remove(NodeId nodeId) {
+    DecommissioningNodeContext context = decomNodes.get(nodeId);
+    if (context != null) {
+      LOG.info("remove " + nodeId + " in " + context.nodeState);
+      decomNodes.remove(nodeId);
+    }
+  }
+
+  /**
+   * Status about a specific decommissioning node.
+   *
+   */
+  public enum DecommissioningNodeStatus {
+    // Node is not in DECOMMISSIONING state.
+    NONE,
+
+    // wait for running containers to complete
+    WAIT_CONTAINER,
+
+    // wait for running application to complete (after all containers complete);
+    WAIT_APP,
+
+    // Timeout waiting for either containers or applications to complete.
+    TIMEOUT,
+
+    // nothing to wait, ready to be decommissioned
+    READY,
+
+    // The node has already been decommissioned
+    DECOMMISSIONED,
+  }
+
+  public boolean checkReadyToBeDecommissioned(NodeId nodeId) {
+    DecommissioningNodeStatus s = checkDecommissioningStatus(nodeId);
+    return (s == DecommissioningNodeStatus.READY ||
+            s == DecommissioningNodeStatus.TIMEOUT);
+  }
+
+  public DecommissioningNodeStatus checkDecommissioningStatus(NodeId nodeId) {
+    DecommissioningNodeContext context = decomNodes.get(nodeId);
+    if (context == null) {
+      return DecommissioningNodeStatus.NONE;
+    }
+
+    if (context.nodeState == NodeState.DECOMMISSIONED) {
+      return DecommissioningNodeStatus.DECOMMISSIONED;
+    }
+
+    long waitTime = mclock.getTime() - context.decommissioningStartTime;
+    if (context.numActiveContainers > 0) {
+      return (context.timeoutMs < 0 || waitTime < context.timeoutMs)?
+          DecommissioningNodeStatus.WAIT_CONTAINER :
+          DecommissioningNodeStatus.TIMEOUT;
+    }
+
+    removeCompletedApps(context);
+    if (context.appIds.size() == 0) {
+      return DecommissioningNodeStatus.READY;
+    } else {
+      return (context.timeoutMs < 0 || waitTime < context.timeoutMs)?
+          DecommissioningNodeStatus.WAIT_APP :
+          DecommissioningNodeStatus.TIMEOUT;
+    }
+  }
+
+  /**
+   * PollTimerTask periodically:
+   *   1. log status of all DECOMMISSIONING nodes;
+   *   2. identify and taken care of stale DECOMMISSIONING nodes
+   *      (for example, node already terminated).
+   */
+  class PollTimerTask extends TimerTask {
+    private final RMContext rmContext;
+
+    public PollTimerTask(RMContext rmContext) {
+      this.rmContext = rmContext;
+    }
+
+    public void run() {
+      logDecommissioningNodesStatus();
+      long now = mclock.getTime();
+      Set<NodeId> staleNodes = new HashSet<NodeId>();
+
+      for (Iterator<Map.Entry<NodeId, DecommissioningNodeContext>> it =
+          decomNodes.entrySet().iterator(); it.hasNext();) {
+        Map.Entry<NodeId, DecommissioningNodeContext> e = it.next();
+        DecommissioningNodeContext d = e.getValue();
+        // Skip node recently updated (NM usually updates every second).
+        if (now - d.lastUpdateTime < 5000L) {
+          continue;
+        }
+        // Remove stale non-DECOMMISSIONING node
+        if (d.nodeState != NodeState.DECOMMISSIONING) {
+          LOG.debug("remove " + d.nodeState + " " + d.nodeId);
+          it.remove();
+          continue;
+        } else if (now - d.lastUpdateTime > 60000L) {
+          // Node DECOMMISSIONED could become stale, remove as necessary.
+          RMNode rmNode = getRmNode(d.nodeId);
+          if (rmNode != null &&
+              rmNode.getState() == NodeState.DECOMMISSIONED) {
+            LOG.debug("remove " + rmNode.getState() + " " + d.nodeId);
+            it.remove();
+            continue;
+          }
+        }
+        if (d.timeoutMs >= 0 &&
+            d.decommissioningStartTime + d.timeoutMs < now) {
+          staleNodes.add(d.nodeId);
+          LOG.debug("Identified stale and timeout node " + d.nodeId);
+        }
+      }
+
+      for (NodeId nodeId : staleNodes) {
+        RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
+        if (rmNode == null || rmNode.getState() != NodeState.DECOMMISSIONING) {
+          remove(nodeId);
+          continue;
+        }
+        if (rmNode.getState() == NodeState.DECOMMISSIONING &&
+            checkReadyToBeDecommissioned(rmNode.getNodeID())) {
+          LOG.info("DECOMMISSIONING " + nodeId + " timeout");
+          this.rmContext.getDispatcher().getEventHandler().handle(
+              new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
+        }
+      }
+    }
+  }
+
+  private RMNode getRmNode(NodeId nodeId) {
+    RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
+    if (rmNode == null) {
+      rmNode = this.rmContext.getInactiveRMNodes().get(nodeId);
+    }
+    return rmNode;
+  }
+
+  private void removeCompletedApps(DecommissioningNodeContext context) {
+    Iterator<ApplicationId> it = context.appIds.iterator();
+    while (it.hasNext()) {
+      ApplicationId appId = it.next();
+      RMApp rmApp = rmContext.getRMApps().get(appId);
+      if (rmApp == null) {
+        LOG.debug("Consider non-existing app " + appId + " as completed");
+        it.remove();
+        continue;
+      }
+      if (rmApp.getState() == RMAppState.FINISHED ||
+          rmApp.getState() == RMAppState.FAILED ||
+          rmApp.getState() == RMAppState.KILLED) {
+        LOG.debug("Remove " + rmApp.getState() + " app " + appId);
+        it.remove();
+      }
+    }
+  }
+
+  // Time in second to be decommissioned.
+  private int getTimeoutInSec(DecommissioningNodeContext context) {
+    if (context.nodeState == NodeState.DECOMMISSIONED) {
+      return 0;
+    } else if (context.nodeState != NodeState.DECOMMISSIONING) {
+      return -1;
+    }
+    if (context.appIds.size() == 0 && context.numActiveContainers == 0) {
+      return 0;
+    }
+    // negative timeout value means no timeout (infinite timeout).
+    if (context.timeoutMs < 0) {
+      return -1;
+    }
+
+    long now = mclock.getTime();
+    long timeout = context.decommissioningStartTime + context.timeoutMs - now;
+    return Math.max(0, (int)(timeout / 1000));
+  }
+
+  private void logDecommissioningNodesStatus() {
+    if (!LOG.isDebugEnabled() || decomNodes.size() == 0) {
+      return;
+    }
+    StringBuilder sb = new StringBuilder();
+    long now = mclock.getTime();
+    for (DecommissioningNodeContext d : decomNodes.values()) {
+      DecommissioningNodeStatus s = checkDecommissioningStatus(d.nodeId);
+      sb.append(String.format(
+          "%n  %-34s %4ds fresh:%3ds containers:%2d %14s",
+          d.nodeId.getHost(),
+          (now - d.decommissioningStartTime) / 1000,
+          (now - d.lastUpdateTime) / 1000,
+          d.numActiveContainers,
+          s));
+      if (s == DecommissioningNodeStatus.WAIT_APP ||
+          s == DecommissioningNodeStatus.WAIT_CONTAINER) {
+        sb.append(String.format(" timeout:%4ds", getTimeoutInSec(d)));
+      }
+      for (ApplicationId aid : d.appIds) {
+        sb.append("\n    " + aid);
+        RMApp rmApp = rmContext.getRMApps().get(aid);
+        if (rmApp != null) {
+          sb.append(String.format(
+              " %s %9s %5.2f%% %5ds",
+              rmApp.getState(),
+              (rmApp.getApplicationType() == null)?
+                  "" : rmApp.getApplicationType(),
+              100.0 * rmApp.getProgress(),
+              (mclock.getTime() - rmApp.getStartTime()) / 1000));
+        }
+      }
+    }
+    LOG.info("Decommissioning Nodes: " + sb.toString());
+  }
+
+  // Read possible new DECOMMISSIONING_TIMEOUT_KEY from yarn-site.xml.
+  // This enables DecommissioningNodesWatcher to pick up new value
+  // without ResourceManager restart.
+  private void readDecommissioningTimeout(Configuration conf) {
+    try {
+      if (conf == null) {
+        conf = new YarnConfiguration();
+      }
+      int v = conf.getInt(
+          YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT,
+          YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
+      if (defaultTimeoutMs != 1000L * v) {
+        defaultTimeoutMs = 1000L * v;
+        LOG.info("Use new decommissioningTimeoutMs: " + defaultTimeoutMs);
+      }
+    } catch (Exception e) {
+      LOG.info("Error readDecommissioningTimeout ", e);
+    }
+  }
+}

+ 123 - 43
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java

@@ -19,14 +19,18 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.Map;
-import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,6 +41,7 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -47,14 +52,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecommissioningEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
-
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 
+import com.google.common.annotations.VisibleForTesting;
+
 @SuppressWarnings("unchecked")
 public class NodesListManager extends CompositeService implements
     EventHandler<NodesListManagerEvent> {
@@ -178,10 +184,11 @@ public class NodesListManager extends CompositeService implements
     if (!LOG.isDebugEnabled()) {
       return;
     }
-    
-    LOG.debug("hostsReader: in=" + conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, 
+
+    LOG.debug("hostsReader: in=" +
+        conf.get(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
         YarnConfiguration.DEFAULT_RM_NODES_INCLUDE_FILE_PATH) + " out=" +
-        conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, 
+        conf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
             YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH));
 
     Set<String> hostsList = new HashSet<String>();
@@ -196,23 +203,19 @@ public class NodesListManager extends CompositeService implements
     }
   }
 
-  public void refreshNodes(Configuration yarnConf) throws IOException,
-      YarnException {
-    refreshHostsReader(yarnConf);
+  public void refreshNodes(Configuration yarnConf)
+      throws IOException, YarnException {
+    refreshNodes(yarnConf, false);
+  }
 
-    for (NodeId nodeId: rmContext.getRMNodes().keySet()) {
-      if (!isValidNode(nodeId.getHost())) {
-        RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
-            RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
-        this.rmContext.getDispatcher().getEventHandler().handle(
-            new RMNodeEvent(nodeId, nodeEventType));
-      }
-    }
-    updateInactiveNodes();
+  public void refreshNodes(Configuration yarnConf, boolean graceful)
+      throws IOException, YarnException {
+    refreshHostsReader(yarnConf, graceful, null);
   }
 
-  private void refreshHostsReader(Configuration yarnConf) throws IOException,
-      YarnException {
+  private void refreshHostsReader(
+      Configuration yarnConf, boolean graceful, Integer timeout)
+          throws IOException, YarnException {
     if (null == yarnConf) {
       yarnConf = new YarnConfiguration();
     }
@@ -222,8 +225,16 @@ public class NodesListManager extends CompositeService implements
     excludesFile =
         yarnConf.get(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
             YarnConfiguration.DEFAULT_RM_NODES_EXCLUDE_FILE_PATH);
+    LOG.info("refreshNodes excludesFile " + excludesFile);
     hostsReader.refresh(includesFile, excludesFile);
     printConfiguredHosts();
+
+    LOG.info("hostsReader include:{" +
+        StringUtils.join(",", hostsReader.getHosts()) +
+        "} exclude:{" +
+        StringUtils.join(",", hostsReader.getExcludedHosts()) + "}");
+
+    handleExcludeNodeList(graceful, timeout);
   }
 
   private void setDecomissionedNMs() {
@@ -237,6 +248,86 @@ public class NodesListManager extends CompositeService implements
     }
   }
 
+  // Handle excluded nodes based on following rules:
+  // Recommission DECOMMISSIONED or DECOMMISSIONING nodes no longer excluded;
+  // Gracefully decommission excluded nodes that are not already
+  // DECOMMISSIONED nor DECOMMISSIONING; Take no action for excluded nodes
+  // that are already DECOMMISSIONED or DECOMMISSIONING.
+  private void handleExcludeNodeList(boolean graceful, Integer timeout) {
+    // DECOMMISSIONED/DECOMMISSIONING nodes need to be re-commissioned.
+    List<RMNode> nodesToRecom = new ArrayList<RMNode>();
+
+    // Nodes need to be decommissioned (graceful or forceful);
+    List<RMNode> nodesToDecom = new ArrayList<RMNode>();
+
+    Set<String> includes = new HashSet<String>();
+    Map<String, Integer> excludes = new HashMap<String, Integer>();
+    hostsReader.getHostDetails(includes, excludes);
+
+    for (RMNode n : this.rmContext.getRMNodes().values()) {
+      NodeState s = n.getState();
+      // An invalid node (either due to explicit exclude or not include)
+      // should be excluded.
+      boolean isExcluded = !isValidNode(
+          n.getHostName(), includes, excludes.keySet());
+      String nodeStr = "node " + n.getNodeID() + " with state " + s;
+      if (!isExcluded) {
+        // Note that no action is needed for DECOMMISSIONED node.
+        if (s == NodeState.DECOMMISSIONING) {
+          LOG.info("Recommission " + nodeStr);
+          nodesToRecom.add(n);
+        }
+        // Otherwise no-action needed.
+      } else {
+        // exclude is true.
+        if (graceful) {
+          // Use per node timeout if exist otherwise the request timeout.
+          Integer timeoutToUse = (excludes.get(n.getHostName()) != null)?
+              excludes.get(n.getHostName()) : timeout;
+          if (s != NodeState.DECOMMISSIONED &&
+              s != NodeState.DECOMMISSIONING) {
+            LOG.info("Gracefully decommission " + nodeStr);
+            nodesToDecom.add(n);
+          } else if (s == NodeState.DECOMMISSIONING &&
+                     !Objects.equals(n.getDecommissioningTimeout(),
+                         timeoutToUse)) {
+            LOG.info("Update " + nodeStr + " timeout to be " + timeoutToUse);
+            nodesToDecom.add(n);
+          } else {
+            LOG.info("No action for " + nodeStr);
+          }
+        } else {
+          if (s != NodeState.DECOMMISSIONED) {
+            LOG.info("Forcefully decommission " + nodeStr);
+            nodesToDecom.add(n);
+          }
+        }
+      }
+    }
+
+    for (RMNode n : nodesToRecom) {
+      RMNodeEvent e = new RMNodeEvent(
+          n.getNodeID(), RMNodeEventType.RECOMMISSION);
+      this.rmContext.getDispatcher().getEventHandler().handle(e);
+    }
+
+    for (RMNode n : nodesToDecom) {
+      RMNodeEvent e;
+      if (graceful) {
+        Integer timeoutToUse = (excludes.get(n.getHostName()) != null)?
+            excludes.get(n.getHostName()) : timeout;
+        e = new RMNodeDecommissioningEvent(n.getNodeID(), timeoutToUse);
+      } else {
+        RMNodeEventType eventType = isUntrackedNode(n.getHostName())?
+            RMNodeEventType.SHUTDOWN : RMNodeEventType.DECOMMISSION;
+        e = new RMNodeEvent(n.getNodeID(), eventType);
+      }
+      this.rmContext.getDispatcher().getEventHandler().handle(e);
+    }
+
+    updateInactiveNodes();
+  }
+
   @VisibleForTesting
   public int getNodeRemovalCheckInterval() {
     return nodeRemovalCheckInterval;
@@ -360,11 +451,15 @@ public class NodesListManager extends CompositeService implements
   }
 
   public boolean isValidNode(String hostName) {
-    String ip = resolver.resolve(hostName);
     Set<String> hostsList = new HashSet<String>();
     Set<String> excludeList = new HashSet<String>();
     hostsReader.getHostDetails(hostsList, excludeList);
+    return isValidNode(hostName, hostsList, excludeList);
+  }
 
+  private boolean isValidNode(
+      String hostName, Set<String> hostsList, Set<String> excludeList) {
+    String ip = resolver.resolve(hostName);
     return (hostsList.isEmpty() || hostsList.contains(hostName) || hostsList
         .contains(ip))
         && !(excludeList.contains(hostName) || excludeList.contains(ip));
@@ -478,29 +573,14 @@ public class NodesListManager extends CompositeService implements
   /**
    * Refresh the nodes gracefully
    *
-   * @param conf
+   * @param yarnConf
+   * @param timeout decommission timeout, null means default timeout.
    * @throws IOException
    * @throws YarnException
    */
-  public void refreshNodesGracefully(Configuration conf) throws IOException,
-      YarnException {
-    refreshHostsReader(conf);
-    for (Entry<NodeId, RMNode> entry : rmContext.getRMNodes().entrySet()) {
-      NodeId nodeId = entry.getKey();
-      if (!isValidNode(nodeId.getHost())) {
-        RMNodeEventType nodeEventType = isUntrackedNode(nodeId.getHost()) ?
-            RMNodeEventType.SHUTDOWN : RMNodeEventType.GRACEFUL_DECOMMISSION;
-        this.rmContext.getDispatcher().getEventHandler().handle(
-            new RMNodeEvent(nodeId, nodeEventType));
-      } else {
-        // Recommissioning the nodes
-        if (entry.getValue().getState() == NodeState.DECOMMISSIONING) {
-          this.rmContext.getDispatcher().getEventHandler()
-              .handle(new RMNodeEvent(nodeId, RMNodeEventType.RECOMMISSION));
-        }
-      }
-    }
-    updateInactiveNodes();
+  public void refreshNodesGracefully(Configuration yarnConf, Integer timeout)
+      throws IOException, YarnException {
+    refreshHostsReader(yarnConf, true, timeout);
   }
 
   /**
@@ -596,4 +676,4 @@ public class NodesListManager extends CompositeService implements
       this.host = hst;
     }
   }
-}
+}

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java

@@ -70,7 +70,7 @@ public class RMServerUtils {
 
   public static List<RMNode> queryRMNodes(RMContext context,
       EnumSet<NodeState> acceptedStates) {
-    // nodes contains nodes that are NEW, RUNNING OR UNHEALTHY
+    // nodes contains nodes that are NEW, RUNNING, UNHEALTHY or DECOMMISSIONING.
     ArrayList<RMNode> results = new ArrayList<RMNode>();
     if (acceptedStates.contains(NodeState.NEW) ||
         acceptedStates.contains(NodeState.RUNNING) ||

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -113,6 +113,8 @@ public class ResourceTrackerService extends AbstractService implements
   private int minAllocMb;
   private int minAllocVcores;
 
+  private DecommissioningNodesWatcher decommissioningWatcher;
+
   private boolean isDistributedNodeLabelsConf;
   private boolean isDelegatedCentralizedNodeLabelsConf;
   private DynamicResourceConfiguration drConf;
@@ -131,6 +133,7 @@ public class ResourceTrackerService extends AbstractService implements
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     this.readLock = lock.readLock();
     this.writeLock = lock.writeLock();
+    this.decommissioningWatcher = new DecommissioningNodesWatcher(rmContext);
   }
 
   @Override
@@ -170,6 +173,7 @@ public class ResourceTrackerService extends AbstractService implements
     }
 
     loadDynamicResourceConfiguration(conf);
+    decommissioningWatcher.init(conf);
     super.serviceInit(conf);
   }
 
@@ -494,6 +498,7 @@ public class ResourceTrackerService extends AbstractService implements
 
     // Send ping
     this.nmLivelinessMonitor.receivedPing(nodeId);
+    this.decommissioningWatcher.update(rmNode, remoteNodeStatus);
 
     // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
     NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse();
@@ -526,6 +531,20 @@ public class ResourceTrackerService extends AbstractService implements
       updateAppCollectorsMap(request);
     }
 
+    // Evaluate whether a DECOMMISSIONING node is ready to be DECOMMISSIONED.
+    if (rmNode.getState() == NodeState.DECOMMISSIONING &&
+        decommissioningWatcher.checkReadyToBeDecommissioned(
+            rmNode.getNodeID())) {
+      String message = "DECOMMISSIONING " + nodeId +
+          " is ready to be decommissioned";
+      LOG.info(message);
+      this.rmContext.getDispatcher().getEventHandler().handle(
+          new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
+      this.nmLivelinessMonitor.unregister(nodeId);
+      return YarnServerBuilderUtils.newNodeHeartbeatResponse(
+          NodeAction.SHUTDOWN, message);
+    }
+
     // Heartbeat response
     NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils
         .newNodeHeartbeatResponse(lastNodeHeartbeatResponse.

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java

@@ -175,4 +175,10 @@ public interface RMNode {
   long getUntrackedTimeStamp();
 
   void setUntrackedTimeStamp(long timeStamp);
+  /*
+   * Optional decommissioning timeout in second
+   * (null indicates default timeout).
+   * @return the decommissioning timeout in second.
+   */
+  Integer getDecommissioningTimeout();
 }

+ 41 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecommissioningEvent.java

@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+/**
+ * RMNode Decommissioning Event.
+ *
+ */
+public class RMNodeDecommissioningEvent extends RMNodeEvent {
+  // Optional decommissioning timeout in second.
+  private final Integer decommissioningTimeout;
+
+  // Create instance with optional timeout
+  // (timeout could be null which means use default).
+  public RMNodeDecommissioningEvent(NodeId nodeId, Integer timeout) {
+    super(nodeId, RMNodeEventType.GRACEFUL_DECOMMISSION);
+    this.decommissioningTimeout = timeout;
+  }
+
+  public Integer getDecommissioningTimeout() {
+    return this.decommissioningTimeout;
+  }
+}

+ 34 - 20
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -124,6 +125,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   private String healthReport;
   private long lastHealthReportTime;
   private String nodeManagerVersion;
+  private Integer decommissioningTimeout;
 
   private long timeStamp;
   /* Aggregated resource utilization for the containers. */
@@ -179,7 +181,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
                                            NodeState,
                                            RMNodeEventType,
                                            RMNodeEvent>(NodeState.NEW)
-
       //Transitions from NEW state
       .addTransition(NodeState.NEW, NodeState.RUNNING,
           RMNodeEventType.STARTED, new AddNodeTransition())
@@ -265,6 +266,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       .addTransition(NodeState.DECOMMISSIONING, NodeState.REBOOTED,
           RMNodeEventType.REBOOTING,
           new DeactivateNodeTransition(NodeState.REBOOTED))
+      .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
+         RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+         new AddContainersToBeRemovedFromNMTransition())
 
       .addTransition(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONING,
           RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
@@ -633,7 +637,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       } catch (InvalidStateTransitionException e) {
         LOG.error("Can't handle this event at current state", e);
         LOG.error("Invalid event " + event.getType() + 
-            " on Node  " + this.nodeId);
+            " on Node  " + this.nodeId + " oldState " + oldState);
       }
       if (oldState != getState()) {
         LOG.info(nodeId + " Node Transitioned from " + oldState + " to "
@@ -666,6 +670,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     case SHUTDOWN:
       metrics.decrNumShutdownNMs();
       break;
+    case DECOMMISSIONING:
+      metrics.decrDecommissioningNMs();
+      break;
     default:
       LOG.debug("Unexpected previous node state");
     }
@@ -712,6 +719,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     case DECOMMISSIONING:
       metrics.decrDecommissioningNMs();
       break;
+    case DECOMMISSIONED:
+      metrics.decrDecommisionedNMs();
+      break;
     case UNHEALTHY:
       metrics.decrNumUnhealthyNMs();
       break;
@@ -1087,9 +1097,26 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+      Integer timeout = null;
+      if (RMNodeDecommissioningEvent.class.isInstance(event)) {
+        RMNodeDecommissioningEvent e = ((RMNodeDecommissioningEvent) event);
+        timeout = e.getDecommissioningTimeout();
+      }
+      // Pick up possible updates on decommissioningTimeout.
+      if (rmNode.getState() == NodeState.DECOMMISSIONING) {
+        if (!Objects.equals(rmNode.getDecommissioningTimeout(), timeout)) {
+          LOG.info("Update " + rmNode.getNodeID() +
+                   " DecommissioningTimeout to be " + timeout);
+          rmNode.decommissioningTimeout = timeout;
+        } else {
+          LOG.info(rmNode.getNodeID() + " is already DECOMMISSIONING");
+        }
+        return;
+      }
       LOG.info("Put Node " + rmNode.nodeId + " in DECOMMISSIONING.");
       // Update NM metrics during graceful decommissioning.
       rmNode.updateMetricsForGracefulDecommission(initState, finalState);
+      rmNode.decommissioningTimeout = timeout;
       if (rmNode.originalTotalCapability == null){
         rmNode.originalTotalCapability =
             Resources.clone(rmNode.totalCapability);
@@ -1156,24 +1183,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
           return NodeState.UNHEALTHY;
         }
       }
-      if (isNodeDecommissioning) {
-        List<ApplicationId> runningApps = rmNode.getRunningApps();
-
-        List<ApplicationId> keepAliveApps = statusEvent.getKeepAliveAppIds();
-
-        // no running (and keeping alive) app on this node, get it
-        // decommissioned.
-        // TODO may need to check no container is being scheduled on this node
-        // as well.
-        if ((runningApps == null || runningApps.size() == 0)
-            && (keepAliveApps == null || keepAliveApps.size() == 0)) {
-          RMNodeImpl.deactivateNode(rmNode, NodeState.DECOMMISSIONED);
-          return NodeState.DECOMMISSIONED;
-        }
-
-        // TODO (in YARN-3223) if node in decommissioning, get node resource
-        // updated if container get finished (keep available resource to be 0)
-      }
 
       rmNode.handleContainerStatus(statusEvent.getContainers());
       rmNode.handleReportedIncreasedContainers(
@@ -1472,4 +1481,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   public void setUntrackedTimeStamp(long ts) {
     this.timeStamp = ts;
   }
+
+  @Override
+  public Integer getDecommissioningTimeout() {
+    return decommissioningTimeout;
+  }
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java

@@ -97,7 +97,7 @@ public class ClusterMetricsInfo {
     this.rebootedNodes = clusterMetrics.getNumRebootedNMs();
     this.shutdownNodes = clusterMetrics.getNumShutdownNMs();
     this.totalNodes = activeNodes + lostNodes + decommissionedNodes
-        + rebootedNodes + unhealthyNodes + shutdownNodes;
+        + rebootedNodes + unhealthyNodes + decommissioningNodes + shutdownNodes;
   }
 
   public int getAppsSubmitted() {

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java

@@ -272,6 +272,11 @@ public class MockNodes {
     @Override
     public void setUntrackedTimeStamp(long timeStamp) {
     }
+
+    @Override
+    public Integer getDecommissioningTimeout() {
+      return null;
+    }
   };
 
   private static RMNode buildRMNode(int rack, final Resource perNode,

+ 10 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java

@@ -709,6 +709,9 @@ public class MockRM extends ResourceManager {
   public void waitForState(NodeId nodeId, NodeState finalState)
       throws InterruptedException {
     RMNode node = getRMContext().getRMNodes().get(nodeId);
+    if (node == null) {
+      node = getRMContext().getInactiveRMNodes().get(nodeId);
+    }
     Assert.assertNotNull("node shouldn't be null", node);
     int timeWaiting = 0;
     while (!finalState.equals(node.getState())) {
@@ -722,11 +725,17 @@ public class MockRM extends ResourceManager {
       timeWaiting += WAIT_MS_PER_LOOP;
     }
 
-    System.out.println("Node State is : " + node.getState());
+    System.out.println("Node " + nodeId + " State is : " + node.getState());
     Assert.assertEquals("Node state is not correct (timedout)", finalState,
         node.getState());
   }
 
+  public void sendNodeEvent(MockNM nm, RMNodeEventType event) throws Exception {
+    RMNodeImpl node = (RMNodeImpl)
+        getRMContext().getRMNodes().get(nm.getNodeId());
+    node.handle(new RMNodeEvent(nm.getNodeId(), event));
+  }
+
   public KillApplicationResponse killApp(ApplicationId appId) throws Exception {
     ApplicationClientProtocol client = getClientRMService();
     KillApplicationRequest req = KillApplicationRequest.newInstance(appId);

+ 131 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDecommissioningNodesWatcher.java

@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.DecommissioningNodesWatcher.DecommissioningNodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * This class tests DecommissioningNodesWatcher.
+ */
+public class TestDecommissioningNodesWatcher {
+  private MockRM rm;
+
+  @Test
+  public void testDecommissioningNodesWatcher() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, "40");
+
+    rm = new MockRM(conf);
+    rm.start();
+
+    DecommissioningNodesWatcher watcher =
+        new DecommissioningNodesWatcher(rm.getRMContext());
+
+    MockNM nm1 = rm.registerNode("host1:1234", 10240);
+    RMNode node1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+    NodeId id1 = nm1.getNodeId();
+
+    rm.waitForState(id1, NodeState.RUNNING);
+    Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1));
+
+    RMApp app = rm.submitApp(2000);
+    MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
+
+    // Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher.
+    rm.sendNodeEvent(nm1, RMNodeEventType.GRACEFUL_DECOMMISSION);
+    rm.waitForState(id1, NodeState.DECOMMISSIONING);
+
+    // Update status with decreasing number of running containers until 0.
+    watcher.update(node1, createNodeStatus(id1, app, 12));
+    watcher.update(node1, createNodeStatus(id1, app, 11));
+    Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1));
+
+    watcher.update(node1, createNodeStatus(id1, app, 1));
+    Assert.assertEquals(DecommissioningNodeStatus.WAIT_CONTAINER,
+                        watcher.checkDecommissioningStatus(id1));
+
+    watcher.update(node1, createNodeStatus(id1, app, 0));
+    Assert.assertEquals(DecommissioningNodeStatus.WAIT_APP,
+                        watcher.checkDecommissioningStatus(id1));
+
+    // Set app to be FINISHED and verified DecommissioningNodeStatus is READY.
+    MockRM.finishAMAndVerifyAppState(app, rm, nm1, am);
+    rm.waitForState(app.getApplicationId(), RMAppState.FINISHED);
+    Assert.assertEquals(DecommissioningNodeStatus.READY,
+                        watcher.checkDecommissioningStatus(id1));
+  }
+
+  @After
+  public void tearDown() {
+    if (rm != null) {
+      rm.stop();
+    }
+  }
+
+  private NodeStatus createNodeStatus(
+      NodeId nodeId, RMApp app, int numRunningContainers) {
+    return NodeStatus.newInstance(
+        nodeId, 0, getContainerStatuses(app, numRunningContainers),
+        new ArrayList<ApplicationId>(),
+        NodeHealthStatus.newInstance(
+            true,  "", System.currentTimeMillis() - 1000),
+        null, null, null);
+  }
+
+  // Get mocked ContainerStatus for bunch of containers,
+  // where numRunningContainers are RUNNING.
+  private List<ContainerStatus> getContainerStatuses(
+      RMApp app, int numRunningContainers) {
+    // Total 12 containers
+    final int total = 12;
+    numRunningContainers = Math.min(total, numRunningContainers);
+    List<ContainerStatus> output = new ArrayList<ContainerStatus>();
+    for (int i = 0; i < total; i++) {
+      ContainerState cstate = (i >= numRunningContainers)?
+          ContainerState.COMPLETE : ContainerState.RUNNING;
+      output.add(ContainerStatus.newInstance(
+          ContainerId.newContainerId(
+              ApplicationAttemptId.newInstance(app.getApplicationId(), i), 1),
+          cstate, "Dummy", 0));
+    }
+    return output;
+  }
+}
+

+ 0 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java

@@ -254,17 +254,6 @@ public class TestRMNodeTransitions {
         cm.getNumDecommissioningNMs());
     Assert.assertEquals("Decommissioned Nodes", initialDecommissioned,
         cm.getNumDecommisionedNMs());
-
-    // Verify node in DECOMMISSIONING will be changed by status update
-    // without running apps
-    statusEvent = getMockRMNodeStatusEventWithoutRunningApps();
-    node.handle(statusEvent);
-    Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
-    Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
-    Assert.assertEquals("Decommissioning Nodes", initialDecommissioning - 1,
-        cm.getNumDecommissioningNMs());
-    Assert.assertEquals("Decommissioned Nodes", initialDecommissioned + 1,
-        cm.getNumDecommisionedNMs());
   }
 
   @Test

+ 157 - 42
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

@@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
@@ -87,6 +88,9 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
   private final static File TEMP_DIR = new File(System.getProperty(
       "test.build.data", "/tmp"), "decommision");
   private final File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
+  private final File excludeHostFile = new File(TEMP_DIR + File.separator +
+      "excludeHostFile.txt");
+
   private MockRM rm;
 
   /**
@@ -221,6 +225,109 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     checkDecommissionedNMCount(rm, metricCount + 1);
   }
 
+  /**
+   * Graceful decommission node with no running application.
+   */
+  @Test
+  public void testGracefulDecommissionNoApp() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
+        .getAbsolutePath());
+
+    writeToHostsFile("");
+    rm = new MockRM(conf);
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("host1:1234", 5120);
+    MockNM nm2 = rm.registerNode("host2:5678", 10240);
+    MockNM nm3 = rm.registerNode("host3:4433", 5120);
+
+    int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs();
+    NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true);
+    NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true);
+    NodeHeartbeatResponse nodeHeartbeat3 = nm3.nodeHeartbeat(true);
+
+    Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction()));
+    Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat2.getNodeAction()));
+    Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat3.getNodeAction()));
+
+    rm.waitForState(nm2.getNodeId(), NodeState.RUNNING);
+    rm.waitForState(nm3.getNodeId(), NodeState.RUNNING);
+
+    // Graceful decommission both host2 and host3.
+    writeToHostsFile("host2", "host3");
+    rm.getNodesListManager().refreshNodes(conf, true);
+
+    rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING);
+    rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONING);
+
+    nodeHeartbeat1 = nm1.nodeHeartbeat(true);
+    nodeHeartbeat2 = nm2.nodeHeartbeat(true);
+    nodeHeartbeat3 = nm3.nodeHeartbeat(true);
+
+    checkDecommissionedNMCount(rm, metricCount + 2);
+    rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONED);
+    rm.waitForState(nm3.getNodeId(), NodeState.DECOMMISSIONED);
+
+    Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat1.getNodeAction()));
+    Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat2.getNodeAction());
+    Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat3.getNodeAction());
+  }
+
+  /**
+   * Graceful decommission node with running application.
+   */
+  @Test
+  public void testGracefulDecommissionWithApp() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
+        .getAbsolutePath());
+
+    writeToHostsFile("");
+    rm = new MockRM(conf);
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("host1:1234", 10240);
+    MockNM nm2 = rm.registerNode("host2:5678", 20480);
+    MockNM nm3 = rm.registerNode("host3:4433", 10240);
+    NodeId id1 = nm1.getNodeId();
+    NodeId id3 = nm3.getNodeId();
+    rm.waitForState(id1, NodeState.RUNNING);
+    rm.waitForState(id3, NodeState.RUNNING);
+
+    // Create an app and launch two containers on host1.
+    RMApp app = rm.submitApp(2000);
+    MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
+    ApplicationAttemptId aaid = app.getCurrentAppAttempt().getAppAttemptId();
+
+    // Graceful decommission host1 and host3
+    writeToHostsFile("host1", "host3");
+    rm.getNodesListManager().refreshNodes(conf, true);
+    rm.waitForState(id1, NodeState.DECOMMISSIONING);
+    rm.waitForState(id3, NodeState.DECOMMISSIONING);
+
+    // host1 should be DECOMMISSIONING due to running containers.
+    // host3 should become DECOMMISSIONED.
+    nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING);
+    nm3.nodeHeartbeat(true);
+    rm.waitForState(id1, NodeState.DECOMMISSIONING);
+    rm.waitForState(id3, NodeState.DECOMMISSIONED);
+    nm1.nodeHeartbeat(aaid, 2, ContainerState.RUNNING);
+
+    // Complete containers on host1.
+    // Since the app is still RUNNING, expect NodeAction.NORMAL.
+    NodeHeartbeatResponse nodeHeartbeat1 =
+        nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE);
+    Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat1.getNodeAction());
+
+    // Finish the app and verified DECOMMISSIONED.
+    MockRM.finishAMAndVerifyAppState(app, rm, nm1, am);
+    rm.waitForState(app.getApplicationId(), RMAppState.FINISHED);
+    nodeHeartbeat1 = nm1.nodeHeartbeat(aaid, 2, ContainerState.COMPLETE);
+    Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat1.getNodeAction());
+    rm.waitForState(id1, NodeState.DECOMMISSIONED);
+  }
+
   /**
   * Decommissioning using a post-configured include hosts file
   */
@@ -1225,19 +1332,17 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     MockNM nm1 = new MockNM("host1:1234", 5120, resourceTrackerService);
     RegisterNodeManagerResponse response = nm1.registerNode();
     Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction());
+    int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
     writeToHostsFile("host2");
     conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
         hostFile.getAbsolutePath());
     rm.getNodesListManager().refreshNodes(conf);
     NodeHeartbeatResponse heartbeatResponse = nm1.nodeHeartbeat(true);
     Assert.assertEquals(NodeAction.SHUTDOWN, heartbeatResponse.getNodeAction());
-    int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
-    checkShutdownNMCount(rm, shutdownNMsCount);
     checkDecommissionedNMCount(rm, decommisionedNMsCount);
     request.setNodeId(nm1.getNodeId());
     resourceTrackerService.unRegisterNodeManager(request);
-    shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
-    checkShutdownNMCount(rm, shutdownNMsCount);
+    checkShutdownNMCount(rm, ++shutdownNMsCount);
     checkDecommissionedNMCount(rm, decommisionedNMsCount);
 
     // 1. Register the Node Manager
@@ -1273,8 +1378,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     nm1.nodeHeartbeat(true);
     nm2.nodeHeartbeat(true);
 
-    File excludeHostFile =
-        new File(TEMP_DIR + File.separator + "excludeHostFile.txt");
     writeToHostsFile(excludeHostFile, "host1");
     conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
         excludeHostFile.getAbsolutePath());
@@ -1300,8 +1403,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     Assert.assertEquals("The inactiveRMNodes should contain an entry for the" +
         "decommissioned node",
         1, rm1.getRMContext().getInactiveRMNodes().size());
-    excludeHostFile =
-        new File(TEMP_DIR + File.separator + "excludeHostFile.txt");
     writeToHostsFile(excludeHostFile, "");
     conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
         excludeHostFile.getAbsolutePath());
@@ -1331,8 +1432,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     nm1.nodeHeartbeat(true);
     nm2.nodeHeartbeat(true);
     //host3 will not register or heartbeat
-    File excludeHostFile =
-        new File(TEMP_DIR + File.separator + "excludeHostFile.txt");
     writeToHostsFile(excludeHostFile, "host3", "host2");
     conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
         excludeHostFile.getAbsolutePath());
@@ -1364,14 +1463,12 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     MockNM nm2 = rm.registerNode("host2:5678", 10240);
     nm1.nodeHeartbeat(true);
     nm2.nodeHeartbeat(true);
-    File excludeHostFile =
-        new File(TEMP_DIR + File.separator + "excludeHostFile.txt");
     writeToHostsFile(excludeHostFile, "host3", "host2");
     conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
         excludeHostFile.getAbsolutePath());
     writeToHostsFile(hostFile, "host1", "host2");
     writeToHostsFile(excludeHostFile, "host1");
-    rm.getNodesListManager().refreshNodesGracefully(conf);
+    rm.getNodesListManager().refreshNodesGracefully(conf, null);
     rm.drainEvents();
     nm1.nodeHeartbeat(true);
     rm.drainEvents();
@@ -1380,7 +1477,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
         .getInactiveRMNodes().get(nm1.getNodeId()).getState() == NodeState
         .DECOMMISSIONED);
     writeToHostsFile(excludeHostFile, "");
-    rm.getNodesListManager().refreshNodesGracefully(conf);
+    rm.getNodesListManager().refreshNodesGracefully(conf, null);
     rm.drainEvents();
     Assert.assertTrue("Node " + nm1.getNodeId().getHost() +
         " should be Decommissioned", rm.getRMContext()
@@ -1390,7 +1487,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
   }
 
   /**
-   * Remove a node from all lists and check if its forgotten
+   * Remove a node from all lists and check if its forgotten.
    */
   @Test
   public void testNodeRemovalNormally() throws Exception {
@@ -1411,7 +1508,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
   public void refreshNodesOption(boolean doGraceful, Configuration conf)
       throws Exception {
     if (doGraceful) {
-      rm.getNodesListManager().refreshNodesGracefully(conf);
+      rm.getNodesListManager().refreshNodesGracefully(conf, null);
     } else {
       rm.getNodesListManager().refreshNodes(conf);
     }
@@ -1420,8 +1517,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
   public void testNodeRemovalUtil(boolean doGraceful) throws Exception {
     Configuration conf = new Configuration();
     int timeoutValue = 500;
-    File excludeHostFile = new File(TEMP_DIR + File.separator +
-        "excludeHostFile.txt");
     conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, "");
     conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, "");
     conf.setInt(YarnConfiguration.RM_NODEMANAGER_UNTRACKED_REMOVAL_TIMEOUT_MSEC,
@@ -1455,18 +1550,23 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH, hostFile
         .getAbsolutePath());
     refreshNodesOption(doGraceful, conf);
+    if (doGraceful) {
+      rm.waitForState(nm2.getNodeId(), NodeState.DECOMMISSIONING);
+    }
     nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(true);
     rm.drainEvents();
     Assert.assertTrue("Node should not be in active node list",
         !rmContext.getRMNodes().containsKey(nm2.getNodeId()));
 
     RMNode rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
     Assert.assertEquals("Node should be in inactive node list",
-        rmNode.getState(), NodeState.SHUTDOWN);
+        rmNode.getState(),
+        doGraceful? NodeState.DECOMMISSIONED : NodeState.SHUTDOWN);
     Assert.assertEquals("Active nodes should be 2",
         metrics.getNumActiveNMs(), 2);
-    Assert.assertEquals("Shutdown nodes should be 1",
-        metrics.getNumShutdownNMs(), 1);
+    Assert.assertEquals("Shutdown nodes should be expected",
+        metrics.getNumShutdownNMs(), doGraceful? 0 : 1);
 
     int nodeRemovalTimeout =
         conf.getInt(
@@ -1491,14 +1591,18 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     rm.drainEvents();
     writeToHostsFile("host1", ip);
     refreshNodesOption(doGraceful, conf);
+    rm.waitForState(nm2.getNodeId(),
+                    doGraceful? NodeState.DECOMMISSIONING : NodeState.SHUTDOWN);
+    nm2.nodeHeartbeat(true);
     rm.drainEvents();
     rmNode = rmContext.getInactiveRMNodes().get(nm2.getNodeId());
     Assert.assertEquals("Node should be shutdown",
-        rmNode.getState(), NodeState.SHUTDOWN);
+        rmNode.getState(),
+        doGraceful? NodeState.DECOMMISSIONED : NodeState.SHUTDOWN);
     Assert.assertEquals("Active nodes should be 2",
         metrics.getNumActiveNMs(), 2);
-    Assert.assertEquals("Shutdown nodes should be 1",
-        metrics.getNumShutdownNMs(), 1);
+    Assert.assertEquals("Shutdown nodes should be expected",
+        metrics.getNumShutdownNMs(), doGraceful? 0 : 1);
 
     //add back the node before timer expires
     latch.await(maxThreadSleeptime - 2000, TimeUnit.MILLISECONDS);
@@ -1542,6 +1646,20 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     }
 
     //Test decommed/ing node that transitions to untracked,timer should remove
+    testNodeRemovalUtilDecomToUntracked(rmContext, conf, nm1, nm2, nm3,
+        maxThreadSleeptime, doGraceful);
+    rm.stop();
+  }
+
+  // A helper method used by testNodeRemovalUtil to avoid exceeding
+  // max allowed length.
+  private void testNodeRemovalUtilDecomToUntracked(
+      RMContext rmContext, Configuration conf,
+      MockNM nm1, MockNM nm2, MockNM nm3,
+      long maxThreadSleeptime, boolean doGraceful) throws Exception {
+    ClusterMetrics metrics = ClusterMetrics.getMetrics();
+    String ip = NetUtils.normalizeHostName("localhost");
+    CountDownLatch latch = new CountDownLatch(1);
     writeToHostsFile("host1", ip, "host2");
     writeToHostsFile(excludeHostFile, "host2");
     refreshNodesOption(doGraceful, conf);
@@ -1549,7 +1667,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     //nm2.nodeHeartbeat(true);
     nm3.nodeHeartbeat(true);
     latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
-    rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
+    RMNode rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
              rmContext.getInactiveRMNodes().get(nm2.getNodeId());
     Assert.assertNotEquals("Timer for this node was not canceled!",
         rmNode, null);
@@ -1560,6 +1678,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     writeToHostsFile("host1", ip);
     writeToHostsFile(excludeHostFile, "");
     refreshNodesOption(doGraceful, conf);
+    nm2.nodeHeartbeat(true);
     latch.await(maxThreadSleeptime, TimeUnit.MILLISECONDS);
     rmNode = doGraceful ? rmContext.getRMNodes().get(nm2.getNodeId()) :
              rmContext.getInactiveRMNodes().get(nm2.getNodeId());
@@ -1571,16 +1690,12 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
         metrics.getNumShutdownNMs(), 0);
     Assert.assertEquals("Active nodes should be 2",
         metrics.getNumActiveNMs(), 2);
-
-    rm.stop();
   }
 
   private void testNodeRemovalUtilLost(boolean doGraceful) throws Exception {
     Configuration conf = new Configuration();
     conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 2000);
     int timeoutValue = 500;
-    File excludeHostFile = new File(TEMP_DIR + File.separator +
-        "excludeHostFile.txt");
     conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
         hostFile.getAbsolutePath());
     conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
@@ -1613,7 +1728,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     Assert.assertEquals("All 3 nodes should be active",
         metrics.getNumActiveNMs(), 3);
     int waitCount = 0;
-    while(waitCount ++<20){
+    while(waitCount++ < 20){
       synchronized (this) {
         wait(200);
       }
@@ -1665,8 +1780,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
       throws Exception {
     Configuration conf = new Configuration();
     int timeoutValue = 500;
-    File excludeHostFile = new File(TEMP_DIR + File.separator +
-        "excludeHostFile.txt");
     conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
         hostFile.getAbsolutePath());
     conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
@@ -1737,8 +1850,6 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
       throws Exception {
     Configuration conf = new Configuration();
     int timeoutValue = 500;
-    File excludeHostFile = new File(TEMP_DIR + File.separator +
-        "excludeHostFile.txt");
     conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
         hostFile.getAbsolutePath());
     conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
@@ -1782,15 +1893,19 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     nm2.nodeHeartbeat(false);
     nm3.nodeHeartbeat(true);
     rm.drainEvents();
-    Assert.assertNotEquals("host2 should be a shutdown NM!",
-        rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
-    Assert.assertEquals("host2 should be a shutdown NM!",
-        rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(),
-        NodeState.SHUTDOWN);
+    if (!doGraceful) {
+      Assert.assertNotEquals("host2 should be a shutdown NM!",
+          rmContext.getInactiveRMNodes().get(nm2.getNodeId()), null);
+      Assert.assertEquals("host2 should be a shutdown NM!",
+          rmContext.getInactiveRMNodes().get(nm2.getNodeId()).getState(),
+          NodeState.SHUTDOWN);
+    }
     Assert.assertEquals("There should be 2 Active NM!",
         clusterMetrics.getNumActiveNMs(), 2);
-    Assert.assertEquals("There should be 1 Shutdown NM!",
-        clusterMetrics.getNumShutdownNMs(), 1);
+    if (!doGraceful) {
+      Assert.assertEquals("There should be 1 Shutdown NM!",
+          clusterMetrics.getNumShutdownNMs(), 1);
+    }
     Assert.assertEquals("There should be 0 Unhealthy NM!",
         clusterMetrics.getUnhealthyNMs(), 0);
     int nodeRemovalTimeout =
@@ -1818,7 +1933,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
   }
 
   private void writeToHostsFile(String... hosts) throws IOException {
-   writeToHostsFile(hostFile, hosts);
+    writeToHostsFile(hostFile, hosts);
   }
 
   private void writeToHostsFile(File file, String... hosts)

+ 0 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java

@@ -210,8 +210,6 @@ public class TestNMReconnect extends ParameterizedSchedulerTestBase {
     nm1.registerNode();
     rm.waitForState(nm1.getNodeId(), NodeState.RUNNING);
 
-    rm.getRMContext().getNodesListManager().getHostsReader().
-        getExcludedHosts().add("127.0.0.1");
     rm.getRMContext().getDispatcher().getEventHandler().handle(
         new RMNodeEvent(nm1.getNodeId(),
             RMNodeEventType.GRACEFUL_DECOMMISSION));

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnCommands.md

@@ -243,7 +243,7 @@ Usage:
 | COMMAND\_OPTIONS | Description |
 |:---- |:---- |
 | -refreshQueues | Reload the queues' acls, states and scheduler specific properties. ResourceManager will reload the mapred-queues configuration file. |
-| -refreshNodes | Refresh the hosts information at the ResourceManager. |
+| -refreshNodes [-g|graceful [timeout in seconds] -client|server] | Refresh the hosts information at the ResourceManager. -g option indicates graceful decommission of excluded hosts, in which case, the optional timeout indicates maximal time in seconds ResourceManager should wait before forcefully mark the node as decommissioned. |
 | -refreshNodesResources | Refresh resources of NodeManagers at the ResourceManager. |
 | -refreshSuperUserGroupsConfiguration | Refresh superuser proxy groups mappings. |
 | -refreshUserToGroupsMappings | Refresh user-to-groups mappings. |