Ver Fonte

YARN-2273. NPE in ContinuousScheduling thread when we lose a node. (Wei Yan via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1612721 13f79535-47bb-0310-9956-ffa450edef68
Karthik Kambatla há 11 anos atrás
pai
commit
a24001475c

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -70,6 +70,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2321. NodeManager web UI can incorrectly report Pmem enforcement
     (Leitao Guo via jlowe)
 
+    YARN-2273. NPE in ContinuousScheduling thread when we lose a node. 
+    (Wei Yan via kasha)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 35 - 30
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -970,37 +970,27 @@ public class FairScheduler extends
     }
   }
 
-  private void continuousScheduling() {
-    while (true) {
-      List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
-      // Sort the nodes by space available on them, so that we offer
-      // containers on emptier nodes first, facilitating an even spread. This
-      // requires holding the scheduler lock, so that the space available on a
-      // node doesn't change during the sort.
-      synchronized (this) {
-        Collections.sort(nodeIdList, nodeAvailableResourceComparator);
-      }
+  void continuousSchedulingAttempt() {
+    List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
+    // Sort the nodes by space available on them, so that we offer
+    // containers on emptier nodes first, facilitating an even spread. This
+    // requires holding the scheduler lock, so that the space available on a
+    // node doesn't change during the sort.
+    synchronized (this) {
+      Collections.sort(nodeIdList, nodeAvailableResourceComparator);
+    }
 
-      // iterate all nodes
-      for (NodeId nodeId : nodeIdList) {
-        if (nodes.containsKey(nodeId)) {
-          FSSchedulerNode node = getFSSchedulerNode(nodeId);
-          try {
-            if (Resources.fitsIn(minimumAllocation,
-                    node.getAvailableResource())) {
-              attemptScheduling(node);
-            }
-          } catch (Throwable ex) {
-            LOG.warn("Error while attempting scheduling for node " + node +
-                    ": " + ex.toString(), ex);
-          }
-        }
-      }
+    // iterate all nodes
+    for (NodeId nodeId : nodeIdList) {
+      FSSchedulerNode node = getFSSchedulerNode(nodeId);
       try {
-        Thread.sleep(getContinuousSchedulingSleepMs());
-      } catch (InterruptedException e) {
-        LOG.warn("Error while doing sleep in continuous scheduling: " +
-                e.toString(), e);
+        if (node != null && Resources.fitsIn(minimumAllocation,
+            node.getAvailableResource())) {
+          attemptScheduling(node);
+        }
+      } catch (Throwable ex) {
+        LOG.error("Error while attempting scheduling for node " + node +
+            ": " + ex.toString(), ex);
       }
     }
   }
@@ -1010,6 +1000,12 @@ public class FairScheduler extends
 
     @Override
     public int compare(NodeId n1, NodeId n2) {
+      if (!nodes.containsKey(n1)) {
+        return 1;
+      }
+      if (!nodes.containsKey(n2)) {
+        return -1;
+      }
       return RESOURCE_CALCULATOR.compare(clusterResource,
               nodes.get(n2).getAvailableResource(),
               nodes.get(n1).getAvailableResource());
@@ -1234,7 +1230,16 @@ public class FairScheduler extends
           new Runnable() {
             @Override
             public void run() {
-              continuousScheduling();
+              while (!Thread.currentThread().isInterrupted()) {
+                try {
+                  continuousSchedulingAttempt();
+                  Thread.sleep(getContinuousSchedulingSleepMs());
+                } catch (InterruptedException e) {
+                  LOG.error("Continuous scheduling thread interrupted. Exiting. ",
+                      e);
+                  return;
+                }
+              }
             }
           }
       );

+ 37 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -2763,7 +2763,43 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     Assert.assertEquals(2, nodes.size());
   }
 
-  
+  @Test
+  public void testContinuousSchedulingWithNodeRemoved() throws Exception {
+    // Disable continuous scheduling, will invoke continuous scheduling once manually
+    scheduler.init(conf);
+    scheduler.start();
+    Assert.assertTrue("Continuous scheduling should be disabled.",
+        !scheduler.isContinuousSchedulingEnabled());
+
+    // Add two nodes
+    RMNode node1 =
+        MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
+            "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+    RMNode node2 =
+        MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2,
+            "127.0.0.2");
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    scheduler.handle(nodeEvent2);
+    Assert.assertEquals("We should have two alive nodes.",
+        2, scheduler.getNumClusterNodes());
+
+    // Remove one node
+    NodeRemovedSchedulerEvent removeNode1 = new NodeRemovedSchedulerEvent(node1);
+    scheduler.handle(removeNode1);
+    Assert.assertEquals("We should only have one alive node.",
+        1, scheduler.getNumClusterNodes());
+
+    // Invoke the continuous scheduling once
+    try {
+      scheduler.continuousSchedulingAttempt();
+    } catch (Exception e) {
+      fail("Exception happened when doing continuous scheduling. " +
+        e.toString());
+    }
+  }
+
   @Test
   public void testDontAllowUndeclaredPools() throws Exception{
     conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);