Przeglądaj źródła

YARN-4677. RMNodeResourceUpdateEvent update from scheduler can lead to race condition (wilfreds and gphillips via rkanter)

(cherry picked from commit 0cd145a44390bc1a01113dce4be4e629637c3e8a)
Robert Kanter 7 lat temu
rodzic
commit
d5e6d0d5f4

+ 12 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -1088,12 +1088,16 @@ public abstract class AbstractYarnScheduler
     }
     }
 
 
     // Process new container information
     // Process new container information
+    // NOTICE: it is possible to not find the NodeID as a node can be
+    // decommissioned at the same time. Skip updates if node is null.
     SchedulerNode schedulerNode = getNode(nm.getNodeID());
     SchedulerNode schedulerNode = getNode(nm.getNodeID());
     List<ContainerStatus> completedContainers = updateNewContainerInfo(nm,
     List<ContainerStatus> completedContainers = updateNewContainerInfo(nm,
         schedulerNode);
         schedulerNode);
 
 
     // Notify Scheduler Node updated.
     // Notify Scheduler Node updated.
-    schedulerNode.notifyNodeUpdate();
+    if (schedulerNode != null) {
+      schedulerNode.notifyNodeUpdate();
+    }
 
 
     // Process completed containers
     // Process completed containers
     Resource releasedResources = Resource.newInstance(0, 0);
     Resource releasedResources = Resource.newInstance(0, 0);
@@ -1103,9 +1107,7 @@ public abstract class AbstractYarnScheduler
     // If the node is decommissioning, send an update to have the total
     // If the node is decommissioning, send an update to have the total
     // resource equal to the used resource, so no available resource to
     // resource equal to the used resource, so no available resource to
     // schedule.
     // schedule.
-    // TODO YARN-5128: Fix possible race-condition when request comes in before
-    // update is propagated
-    if (nm.getState() == NodeState.DECOMMISSIONING) {
+    if (nm.getState() == NodeState.DECOMMISSIONING && schedulerNode != null) {
       this.rmContext
       this.rmContext
           .getDispatcher()
           .getDispatcher()
           .getEventHandler()
           .getEventHandler()
@@ -1115,13 +1117,16 @@ public abstract class AbstractYarnScheduler
     }
     }
 
 
     updateSchedulerHealthInformation(releasedResources, releasedContainers);
     updateSchedulerHealthInformation(releasedResources, releasedContainers);
-    updateNodeResourceUtilization(nm, schedulerNode);
+    if (schedulerNode != null) {
+      updateNodeResourceUtilization(nm, schedulerNode);
+    }
 
 
     // Now node data structures are up-to-date and ready for scheduling.
     // Now node data structures are up-to-date and ready for scheduling.
     if(LOG.isDebugEnabled()) {
     if(LOG.isDebugEnabled()) {
       LOG.debug(
       LOG.debug(
-          "Node being looked for scheduling " + nm + " availableResource: "
-              + schedulerNode.getUnallocatedResource());
+          "Node being looked for scheduling " + nm + " availableResource: " +
+              (schedulerNode == null ? "unknown (decommissioned)" :
+                  schedulerNode.getUnallocatedResource()));
     }
     }
   }
   }
 
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -976,7 +976,7 @@ public class FairScheduler extends
         return;
         return;
       }
       }
 
 
-      final NodeId nodeID = node.getNodeID();
+      final NodeId nodeID = (node != null ? node.getNodeID() : null);
       if (!nodeTracker.exists(nodeID)) {
       if (!nodeTracker.exists(nodeID)) {
         // The node might have just been removed while this thread was waiting
         // The node might have just been removed while this thread was waiting
         // on the synchronized lock before it entered this synchronized method
         // on the synchronized lock before it entered this synchronized method

+ 4 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -961,8 +961,10 @@ public class FifoScheduler extends
       return;
       return;
     }
     }
 
 
-    if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(),
-        node.getUnallocatedResource(), minimumAllocation)) {
+    // A decommissioned node might be removed before we get here
+    if (node != null &&
+        Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(),
+            node.getUnallocatedResource(), minimumAllocation)) {
       LOG.debug("Node heartbeat " + nm.getNodeID() +
       LOG.debug("Node heartbeat " + nm.getNodeID() +
           " available resource = " + node.getUnallocatedResource());
           " available resource = " + node.getUnallocatedResource());
 
 

+ 32 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -279,14 +279,12 @@ public class TestCapacityScheduler {
     }
     }
   }
   }
 
 
-  private NodeManager
-      registerNode(String hostName, int containerManagerPort, int httpPort,
-          String rackName, Resource capability)
+  private NodeManager registerNode(String hostName, int containerManagerPort,
+                                   int httpPort, String rackName,
+                                   Resource capability)
           throws IOException, YarnException {
           throws IOException, YarnException {
-    NodeManager nm =
-        new NodeManager(
-            hostName, containerManagerPort, httpPort, rackName, capability,
-            resourceManager);
+    NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort,
+        rackName, capability, resourceManager);
     NodeAddedSchedulerEvent nodeAddEvent1 =
     NodeAddedSchedulerEvent nodeAddEvent1 =
         new NodeAddedSchedulerEvent(resourceManager.getRMContext()
         new NodeAddedSchedulerEvent(resourceManager.getRMContext()
             .getRMNodes().get(nm.getNodeId()));
             .getRMNodes().get(nm.getNodeId()));
@@ -301,13 +299,13 @@ public class TestCapacityScheduler {
 
 
     // Register node1
     // Register node1
     String host_0 = "host_0";
     String host_0 = "host_0";
-    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
+    NodeManager nm_0 =
         registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
         registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
             Resources.createResource(4 * GB, 1));
             Resources.createResource(4 * GB, 1));
 
 
     // Register node2
     // Register node2
     String host_1 = "host_1";
     String host_1 = "host_1";
-    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_1 =
+    NodeManager nm_1 =
         registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
         registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
             Resources.createResource(2 * GB, 1));
             Resources.createResource(2 * GB, 1));
 
 
@@ -4080,6 +4078,29 @@ public class TestCapacityScheduler {
       Assert.fail("Cannot find RMContainer");
       Assert.fail("Cannot find RMContainer");
     }
     }
   }
   }
+  @Test
+  public void testRemovedNodeDecomissioningNode() throws Exception {
+    // Register nodemanager
+    NodeManager nm = registerNode("host_decom", 1234, 2345,
+        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
+
+    RMNode node =
+        resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
+    // Send a heartbeat to kick the tires on the Scheduler
+    NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
+    resourceManager.getResourceScheduler().handle(nodeUpdate);
+
+    // force remove the node to simulate race condition
+    ((CapacityScheduler) resourceManager.getResourceScheduler()).getNodeTracker().
+        removeNode(nm.getNodeId());
+    // Kick off another heartbeat with the node state mocked to decommissioning
+    RMNode spyNode =
+        Mockito.spy(resourceManager.getRMContext().getRMNodes()
+            .get(nm.getNodeId()));
+    when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
+    resourceManager.getResourceScheduler().handle(
+        new NodeUpdateSchedulerEvent(spyNode));
+  }
 
 
   @Test
   @Test
   public void testResourceUpdateDecommissioningNode() throws Exception {
   public void testResourceUpdateDecommissioningNode() throws Exception {
@@ -4106,9 +4127,8 @@ public class TestCapacityScheduler {
     ((AsyncDispatcher) mockDispatcher).start();
     ((AsyncDispatcher) mockDispatcher).start();
     // Register node
     // Register node
     String host_0 = "host_0";
     String host_0 = "host_0";
-    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
-        registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-            Resources.createResource(8 * GB, 4));
+    NodeManager nm_0 = registerNode(host_0, 1234, 2345,
+        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
     // ResourceRequest priorities
     // ResourceRequest priorities
     Priority priority_0 = Priority.newInstance(0);
     Priority priority_0 = Priority.newInstance(0);
 
 

+ 33 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
@@ -4967,6 +4968,30 @@ public class TestFairScheduler extends FairSchedulerTestBase {
         .get(attId3.getApplicationId()).getQueue());
         .get(attId3.getApplicationId()).getQueue());
   }
   }
 
 
+  @Test
+  public void testRemovedNodeDecomissioningNode() throws Exception {
+    // Register nodemanager
+    NodeManager nm = registerNode("host_decom", 1234, 2345,
+        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
+
+    RMNode node =
+        resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
+    // Send a heartbeat to kick the tires on the Scheduler
+    NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
+    resourceManager.getResourceScheduler().handle(nodeUpdate);
+
+    // Force remove the node to simulate race condition
+    ((FairScheduler) resourceManager.getResourceScheduler())
+        .getNodeTracker().removeNode(nm.getNodeId());
+    // Kick off another heartbeat with the node state mocked to decommissioning
+    RMNode spyNode =
+        Mockito.spy(resourceManager.getRMContext().getRMNodes()
+            .get(nm.getNodeId()));
+    when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
+    resourceManager.getResourceScheduler().handle(
+        new NodeUpdateSchedulerEvent(spyNode));
+  }
+
   @Test
   @Test
   public void testResourceUpdateDecommissioningNode() throws Exception {
   public void testResourceUpdateDecommissioningNode() throws Exception {
     // Mock the RMNodeResourceUpdate event handler to update SchedulerNode
     // Mock the RMNodeResourceUpdate event handler to update SchedulerNode
@@ -4992,9 +5017,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     ((AsyncDispatcher) mockDispatcher).start();
     ((AsyncDispatcher) mockDispatcher).start();
     // Register node
     // Register node
     String host_0 = "host_0";
     String host_0 = "host_0";
-    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
-        registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-            Resources.createResource(8 * GB, 4));
+    NodeManager nm_0 = registerNode(host_0, 1234, 2345,
+        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
 
 
     RMNode node =
     RMNode node =
         resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId());
         resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId());
@@ -5032,13 +5056,12 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     Assert.assertEquals(availableResource.getVirtualCores(), 0);
     Assert.assertEquals(availableResource.getVirtualCores(), 0);
   }
   }
 
 
-  private org.apache.hadoop.yarn.server.resourcemanager.NodeManager registerNode(
-      String hostName, int containerManagerPort, int httpPort, String rackName,
-      Resource capability) throws IOException, YarnException {
-    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm =
-        new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName,
-            containerManagerPort, httpPort, rackName, capability,
-            resourceManager);
+  private NodeManager registerNode(String hostName, int containerManagerPort,
+                                   int httpPort, String rackName,
+                                   Resource capability)
+      throws IOException, YarnException {
+    NodeManager nm = new NodeManager(hostName, containerManagerPort, httpPort,
+        rackName, capability, resourceManager);
 
 
     // after YARN-5375, scheduler event is processed in rm main dispatcher,
     // after YARN-5375, scheduler event is processed in rm main dispatcher,
     // wait it processed, or may lead dead lock
     // wait it processed, or may lead dead lock

+ 33 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java

@@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
@@ -137,14 +138,12 @@ public class TestFifoScheduler {
     resourceManager.stop();
     resourceManager.stop();
   }
   }
   
   
-  private org.apache.hadoop.yarn.server.resourcemanager.NodeManager
-      registerNode(String hostName, int containerManagerPort, int nmHttpPort,
-          String rackName, Resource capability) throws IOException,
-          YarnException {
-    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm =
-        new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName,
-            containerManagerPort, nmHttpPort, rackName, capability,
-            resourceManager);
+  private NodeManager registerNode(String hostName, int containerManagerPort,
+                                   int nmHttpPort, String rackName,
+                                   Resource capability)
+      throws IOException, YarnException {
+    NodeManager nm = new NodeManager(hostName, containerManagerPort,
+        nmHttpPort, rackName, capability, resourceManager);
     NodeAddedSchedulerEvent nodeAddEvent1 =
     NodeAddedSchedulerEvent nodeAddEvent1 =
         new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes()
         new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes()
             .get(nm.getNodeId()));
             .get(nm.getNodeId()));
@@ -1190,6 +1189,30 @@ public class TestFifoScheduler {
     rm.stop();
     rm.stop();
   }
   }
 
 
+  @Test
+  public void testRemovedNodeDecomissioningNode() throws Exception {
+    // Register nodemanager
+    NodeManager nm = registerNode("host_decom", 1234, 2345,
+        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
+
+    RMNode node =
+        resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
+    // Send a heartbeat to kick the tires on the Scheduler
+    NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
+    resourceManager.getResourceScheduler().handle(nodeUpdate);
+
+    // Force remove the node to simulate race condition
+    ((FifoScheduler) resourceManager.getResourceScheduler())
+        .getNodeTracker().removeNode(nm.getNodeId());
+    // Kick off another heartbeat with the node state mocked to decommissioning
+    RMNode spyNode =
+        Mockito.spy(resourceManager.getRMContext().getRMNodes()
+            .get(nm.getNodeId()));
+    when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
+    resourceManager.getResourceScheduler().handle(
+        new NodeUpdateSchedulerEvent(spyNode));
+  }
+
   @Test
   @Test
   public void testResourceUpdateDecommissioningNode() throws Exception {
   public void testResourceUpdateDecommissioningNode() throws Exception {
     // Mock the RMNodeResourceUpdate event handler to update SchedulerNode
     // Mock the RMNodeResourceUpdate event handler to update SchedulerNode
@@ -1215,9 +1238,8 @@ public class TestFifoScheduler {
     ((AsyncDispatcher) mockDispatcher).start();
     ((AsyncDispatcher) mockDispatcher).start();
     // Register node
     // Register node
     String host_0 = "host_0";
     String host_0 = "host_0";
-    org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
-        registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
-            Resources.createResource(8 * GB, 4));
+    NodeManager nm_0 = registerNode(host_0, 1234, 2345,
+        NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4));
     // ResourceRequest priorities
     // ResourceRequest priorities
     Priority priority_0 = Priority.newInstance(0);
     Priority priority_0 = Priority.newInstance(0);