Browse Source

YARN-4344. NMs reconnecting with changed capabilities can lead to wrong cluster resource calculations. Contributed by Varun Vasudev

Jason Lowe 9 years ago
parent
commit
5f05e5e5ba

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -27,6 +27,9 @@ Release 2.6.3 - UNRELEASED
     YARN-4241. Fix typo of property name in yarn-default.xml.
     (Anthony Rojas via aajisaka)
 
+    YARN-4344. NMs reconnecting with changed capabilities can lead to wrong
+    cluster resource calculations (Varun Vasudev via jlowe)
+
 Release 2.6.2 - 2015-10-28
 
   INCOMPATIBLE CHANGES

+ 6 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -1177,9 +1177,10 @@ public class CapacityScheduler extends
   }
 
   private synchronized void addNode(RMNode nodeManager) {
-    this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
-        usePortForNodeName, nodeManager.getNodeLabels()));
-    Resources.addTo(clusterResource, nodeManager.getTotalCapability());
+    FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
+        usePortForNodeName, nodeManager.getNodeLabels());
+    this.nodes.put(nodeManager.getNodeID(), schedulerNode);
+    Resources.addTo(clusterResource, schedulerNode.getTotalResource());
     root.updateClusterResource(clusterResource);
     int numNodes = numNodeManagers.incrementAndGet();
     
@@ -1193,7 +1194,7 @@ public class CapacityScheduler extends
     // update this node to node label manager
     if (labelManager != null) {
       labelManager.activateNode(nodeManager.getNodeID(),
-          nodeManager.getTotalCapability());
+          schedulerNode.getTotalResource());
     }
   }
 
@@ -1207,7 +1208,7 @@ public class CapacityScheduler extends
     if (node == null) {
       return;
     }
-    Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
+    Resources.subtractFrom(clusterResource, node.getTotalResource());
     root.updateClusterResource(clusterResource);
     int numNodes = numNodeManagers.decrementAndGet();
 

+ 5 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -901,7 +901,7 @@ public class FifoScheduler extends
     this.nodes.remove(nodeInfo.getNodeID());
     
     // Update cluster metrics
-    Resources.subtractFrom(clusterResource, node.getRMNode().getTotalCapability());
+    Resources.subtractFrom(clusterResource, node.getTotalResource());
   }
 
   @Override
@@ -916,9 +916,10 @@ public class FifoScheduler extends
   }
 
   private synchronized void addNode(RMNode nodeManager) {
-    this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
-        usePortForNodeName));
-    Resources.addTo(clusterResource, nodeManager.getTotalCapability());
+    FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
+        usePortForNodeName);
+    this.nodes.put(nodeManager.getNodeID(), schedulerNode);
+    Resources.addTo(clusterResource, schedulerNode.getTotalResource());
   }
 
   @Override

+ 115 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -44,6 +44,7 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.LocalConfigurationProvider;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -67,9 +68,15 @@ import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
 import org.apache.hadoop.yarn.server.resourcemanager.Application;
@@ -77,10 +84,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
 import org.apache.hadoop.yarn.server.resourcemanager.Task;
 import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
 import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
@@ -95,6 +105,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
@@ -110,6 +121,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -2127,4 +2139,107 @@ public class TestCapacityScheduler {
         .getUser(userName).getActiveApplications());
     rm.stop();
   }
+
+  private class SleepHandler implements EventHandler<SchedulerEvent> {
+    boolean sleepFlag = false;
+    int sleepTime = 20;
+
+    @Override
+    public void handle(SchedulerEvent event) {
+      try {
+        if (sleepFlag) {
+          Thread.sleep(sleepTime);
+        }
+      } catch (InterruptedException ie) {
+      }
+    }
+  }
+
+  private ResourceTrackerService getPrivateResourceTrackerService(
+      Dispatcher privateDispatcher, SleepHandler sleepHandler) {
+
+    Configuration conf = new Configuration();
+    ResourceTrackerService privateResourceTrackerService;
+
+    RMContext privateContext =
+        new RMContextImpl(privateDispatcher, null, null, null, null, null, null,
+            null, null, null);
+    privateContext.setNodeLabelManager(Mockito.mock(RMNodeLabelsManager.class));
+
+    privateDispatcher.register(SchedulerEventType.class, sleepHandler);
+    privateDispatcher.register(SchedulerEventType.class,
+        resourceManager.getResourceScheduler());
+    privateDispatcher.register(RMNodeEventType.class,
+        new ResourceManager.NodeEventDispatcher(privateContext));
+    ((Service) privateDispatcher).init(conf);
+    ((Service) privateDispatcher).start();
+    NMLivelinessMonitor nmLivelinessMonitor =
+        new NMLivelinessMonitor(privateDispatcher);
+    nmLivelinessMonitor.init(conf);
+    nmLivelinessMonitor.start();
+    NodesListManager nodesListManager = new NodesListManager(privateContext);
+    nodesListManager.init(conf);
+    RMContainerTokenSecretManager containerTokenSecretManager =
+        new RMContainerTokenSecretManager(conf);
+    containerTokenSecretManager.start();
+    NMTokenSecretManagerInRM nmTokenSecretManager =
+        new NMTokenSecretManagerInRM(conf);
+    nmTokenSecretManager.start();
+    privateResourceTrackerService =
+        new ResourceTrackerService(privateContext, nodesListManager,
+            nmLivelinessMonitor, containerTokenSecretManager,
+            nmTokenSecretManager);
+    privateResourceTrackerService.init(conf);
+    privateResourceTrackerService.start();
+    resourceManager.getResourceScheduler().setRMContext(privateContext);
+    return privateResourceTrackerService;
+  }
+
+  /**
+   * Test the behaviour of the capacity scheduler when a node reconnects
+   * with changed capabilities. This test is to catch any race conditions
+   * that might occur due to the use of the RMNode object.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testNodemanagerReconnect() throws Exception {
+
+    DrainDispatcher privateDispatcher = new DrainDispatcher();
+    SleepHandler sleepHandler = new SleepHandler();
+    ResourceTrackerService privateResourceTrackerService =
+        getPrivateResourceTrackerService(privateDispatcher, sleepHandler);
+
+    // Register node1
+    String hostname1 = "localhost1";
+    Resource capability = BuilderUtils.newResource(4096, 4);
+    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
+    RegisterNodeManagerRequest request1 =
+        recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
+    NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
+    request1.setNodeId(nodeId1);
+    request1.setHttpPort(0);
+    request1.setResource(capability);
+    privateResourceTrackerService.registerNodeManager(request1);
+    privateDispatcher.await();
+    Resource clusterResource =
+        resourceManager.getResourceScheduler().getClusterResource();
+    Assert.assertEquals("Initial cluster resources don't match", capability,
+        clusterResource);
+
+    Resource newCapability = BuilderUtils.newResource(1024, 1);
+    RegisterNodeManagerRequest request2 =
+        recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
+    request2.setNodeId(nodeId1);
+    request2.setHttpPort(0);
+    request2.setResource(newCapability);
+    // hold up the disaptcher and register the same node with lower capability
+    sleepHandler.sleepFlag = true;
+    privateResourceTrackerService.registerNodeManager(request2);
+    privateDispatcher.await();
+    Assert.assertEquals("Cluster resources don't match", newCapability,
+        resourceManager.getResourceScheduler().getClusterResource());
+    privateResourceTrackerService.stop();
+  }
 }