浏览代码

YARN-1986. In Fifo Scheduler, node heartbeat in between creating app and attempt causes NPE (Hong Zhiguo via Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.4@1594479 13f79535-47bb-0310-9956-ffa450edef68
Sanford Ryza 11 年之前
父节点
当前提交
edb082173f

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -108,6 +108,9 @@ Release 2.4.1 - UNRELEASED
     YARN-1957. Consider the max capacity of the queue when computing the ideal
     capacity for preemption. (Carlo Curino via cdouglas)
 
+    YARN-1986. In Fifo Scheduler, node heartbeat in between creating app and
+    attempt causes NPE (Hong Zhiguo via Sandy Ryza)
+
 Release 2.4.0 - 2014-04-07 
 
   INCOMPATIBLE CHANGES

+ 10 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -360,7 +360,8 @@ public class FifoScheduler extends AbstractYarnScheduler implements
     return nodes.get(nodeId);
   }
 
-  private synchronized void addApplication(ApplicationId applicationId,
+  @VisibleForTesting
+  public synchronized void addApplication(ApplicationId applicationId,
       String queue, String user) {
     SchedulerApplication application =
         new SchedulerApplication(DEFAULT_QUEUE, user);
@@ -372,7 +373,8 @@ public class FifoScheduler extends AbstractYarnScheduler implements
         .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
   }
 
-  private synchronized void
+  @VisibleForTesting
+  public synchronized void
       addApplicationAttempt(ApplicationAttemptId appAttemptId,
           boolean transferStateFromPreviousAttempt) {
     SchedulerApplication application =
@@ -458,6 +460,9 @@ public class FifoScheduler extends AbstractYarnScheduler implements
         .entrySet()) {
       FiCaSchedulerApp application =
           (FiCaSchedulerApp) e.getValue().getCurrentAppAttempt();
+      if (application == null) {
+        continue;
+      }
       LOG.debug("pre-assignContainers");
       application.showRequests();
       synchronized (application) {
@@ -497,6 +502,9 @@ public class FifoScheduler extends AbstractYarnScheduler implements
     for (SchedulerApplication application : applications.values()) {
       FiCaSchedulerApp attempt =
           (FiCaSchedulerApp) application.getCurrentAppAttempt();
+      if (attempt == null) {
+        continue;
+      }
       attempt.setHeadroom(Resources.subtract(clusterResource, usedResource));
     }
   }

+ 28 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java

@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -66,7 +67,7 @@ public class TestFifoScheduler {
   
   private final int GB = 1024;
   private static YarnConfiguration conf;
-  
+
   @BeforeClass
   public static void setup() {
     conf = new YarnConfiguration();
@@ -213,6 +214,32 @@ public class TestFifoScheduler {
     rm.stop();
   }
 
+  @Test
+  public void testNodeUpdateBeforeAppAttemptInit() throws Exception {
+    FifoScheduler scheduler = new FifoScheduler();
+    MockRM rm = new MockRM(conf);
+    scheduler.reinitialize(conf, rm.getRMContext());
+
+    RMNode node = MockNodes.newNodeInfo(1,
+            Resources.createResource(1024, 4), 1, "127.0.0.1");
+    scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    scheduler.addApplication(appId, "queue1", "user1");
+
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+    try {
+      scheduler.handle(updateEvent);
+    } catch (NullPointerException e) {
+        Assert.fail();
+    }
+
+    ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 1);
+    scheduler.addApplicationAttempt(attId, false);
+
+    rm.stop();
+  }
+
   private void testMinimumAllocation(YarnConfiguration conf, int testAlloc)
       throws Exception {
     MockRM rm = new MockRM(conf);