소스 검색

YARN-5837. NPE when getting node status of a decommissioned node after an RM restart. Contributed by Robert Kanter
(cherry picked from commit 6bb741ff0ef208a8628bc64d6537999d4cd67955)

Jason Lowe 8 년 전
부모
커밋
4cb4e4c0a1

+ 4 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -230,7 +231,8 @@ public class NodesListManager extends CompositeService implements
     for (final String host : excludeList) {
       NodeId nodeId = createUnknownNodeId(host);
       RMNodeImpl rmNode = new RMNodeImpl(nodeId,
-          rmContext, host, -1, -1, new UnknownNode(host), null, null);
+          rmContext, host, -1, -1, new UnknownNode(host),
+          Resource.newInstance(0, 0), "unknown");
       rmContext.getInactiveRMNodes().put(nodeId, rmNode);
       rmNode.handle(new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
     }
@@ -595,4 +597,4 @@ public class NodesListManager extends CompositeService implements
       this.host = hst;
     }
   }
-}
+}

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java

@@ -247,4 +247,8 @@ public class MockNM {
   public int getvCores() {
     return vCores;
   }
+
+  public String getVersion() {
+    return version;
+  }
 }

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java

@@ -40,6 +40,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -100,6 +101,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -1890,6 +1892,9 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
       rm1.start();
       MockNM nm1 = rm1.registerNode("localhost:1234", 8000);
       MockNM nm2 = rm1.registerNode("host2:1234", 8000);
+      Resource expectedCapability =
+          Resource.newInstance(nm1.getMemory(), nm1.getvCores());
+      String expectedVersion = nm1.getVersion();
       Assert
           .assertEquals(0,
               ClusterMetrics.getMetrics().getNumDecommisionedNMs());
@@ -1911,6 +1916,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
       Assert
           .assertEquals(2,
               ClusterMetrics.getMetrics().getNumDecommisionedNMs());
+      verifyNodesAfterDecom(rm1, 2, expectedCapability, expectedVersion);
       rm1.stop();
       rm1 = null;
       Assert
@@ -1924,6 +1930,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
       Assert
           .assertEquals(2,
               ClusterMetrics.getMetrics().getNumDecommisionedNMs());
+      verifyNodesAfterDecom(rm2, 2, Resource.newInstance(0, 0), "unknown");
     } finally {
       if (rm1 != null) {
         rm1.stop();
@@ -1934,6 +1941,18 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
     }
   }
 
+  private void verifyNodesAfterDecom(MockRM rm, int numNodes,
+                                     Resource expectedCapability,
+                                     String expectedVersion) {
+    ConcurrentMap<NodeId, RMNode> inactiveRMNodes =
+        rm.getRMContext().getInactiveRMNodes();
+    Assert.assertEquals(numNodes, inactiveRMNodes.size());
+    for (RMNode rmNode : inactiveRMNodes.values()) {
+      Assert.assertEquals(expectedCapability, rmNode.getTotalCapability());
+      Assert.assertEquals(expectedVersion, rmNode.getNodeManagerVersion());
+    }
+  }
+
   // Test Delegation token is renewed synchronously so that recover events
   // can be processed before any other external incoming events, specifically
   // the ContainerFinished event on NM re-registraton.