소스 검색

YARN-5788. Apps not activiated and AM limit resource in UI and REST not updated after -replaceLabelsOnNode (Bibin A Chundatt via Varun Saxena)

Varun Saxena 8 년 전
부모
커밋
7d2d8d25ba

+ 58 - 47
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -1126,57 +1126,52 @@ public class CapacityScheduler extends
       writeLock.unlock();
     }
   }
-  
+
   /**
    * Process node labels update on a node.
    */
   private void updateLabelsOnNode(NodeId nodeId,
       Set<String> newLabels) {
-    try {
-      writeLock.lock();
-      FiCaSchedulerNode node = nodeTracker.getNode(nodeId);
-      if (null == node) {
-        return;
-      }
-
-      // Get new partition, we have only one partition per node
-      String newPartition;
-      if (newLabels.isEmpty()) {
-        newPartition = RMNodeLabelsManager.NO_LABEL;
-      } else{
-        newPartition = newLabels.iterator().next();
-      }
+    FiCaSchedulerNode node = nodeTracker.getNode(nodeId);
+    if (null == node) {
+      return;
+    }
 
-      // old partition as well
-      String oldPartition = node.getPartition();
+    // Get new partition, we have only one partition per node
+    String newPartition;
+    if (newLabels.isEmpty()) {
+      newPartition = RMNodeLabelsManager.NO_LABEL;
+    } else{
+      newPartition = newLabels.iterator().next();
+    }
 
-      // Update resources of these containers
-      for (RMContainer rmContainer : node.getCopiedListOfRunningContainers()) {
-        FiCaSchedulerApp application = getApplicationAttempt(
-            rmContainer.getApplicationAttemptId());
-        if (null != application) {
-          application.nodePartitionUpdated(rmContainer, oldPartition,
-              newPartition);
-        } else{
-          LOG.warn("There's something wrong, some RMContainers running on"
-              + " a node, but we cannot find SchedulerApplicationAttempt "
-              + "for it. Node=" + node.getNodeID() + " applicationAttemptId="
-              + rmContainer.getApplicationAttemptId());
-          continue;
-        }
-      }
+    // old partition as well
+    String oldPartition = node.getPartition();
 
-      // Unreserve container on this node
-      RMContainer reservedContainer = node.getReservedContainer();
-      if (null != reservedContainer) {
-        killReservedContainer(reservedContainer);
+    // Update resources of these containers
+    for (RMContainer rmContainer : node.getCopiedListOfRunningContainers()) {
+      FiCaSchedulerApp application = getApplicationAttempt(
+          rmContainer.getApplicationAttemptId());
+      if (null != application) {
+        application.nodePartitionUpdated(rmContainer, oldPartition,
+            newPartition);
+      } else{
+        LOG.warn("There's something wrong, some RMContainers running on"
+            + " a node, but we cannot find SchedulerApplicationAttempt "
+            + "for it. Node=" + node.getNodeID() + " applicationAttemptId="
+            + rmContainer.getApplicationAttemptId());
+        continue;
       }
+    }
 
-      // Update node labels after we've done this
-      node.updateLabels(newLabels);
-    } finally {
-      writeLock.unlock();
+    // Unreserve container on this node
+    RMContainer reservedContainer = node.getReservedContainer();
+    if (null != reservedContainer) {
+      killReservedContainer(reservedContainer);
     }
+
+    // Update node labels after we've done this
+    node.updateLabels(newLabels);
   }
 
   private void updateSchedulerHealth(long now, FiCaSchedulerNode node,
@@ -1371,13 +1366,8 @@ public class CapacityScheduler extends
     {
       NodeLabelsUpdateSchedulerEvent labelUpdateEvent =
           (NodeLabelsUpdateSchedulerEvent) event;
-      
-      for (Entry<NodeId, Set<String>> entry : labelUpdateEvent
-          .getUpdatedNodeToLabels().entrySet()) {
-        NodeId id = entry.getKey();
-        Set<String> labels = entry.getValue();
-        updateLabelsOnNode(id, labels);
-      }
+
+      updateNodeLabelsAndQueueResource(labelUpdateEvent);
     }
     break;
     case NODE_UPDATE:
@@ -1482,6 +1472,27 @@ public class CapacityScheduler extends
     }
   }
 
+  /**
+   * Process node labels update.
+   */
+  private void updateNodeLabelsAndQueueResource(
+      NodeLabelsUpdateSchedulerEvent labelUpdateEvent) {
+    try {
+      writeLock.lock();
+      for (Entry<NodeId, Set<String>> entry : labelUpdateEvent
+          .getUpdatedNodeToLabels().entrySet()) {
+        NodeId id = entry.getKey();
+        Set<String> labels = entry.getValue();
+        updateLabelsOnNode(id, labels);
+      }
+      Resource clusterResource = getClusterResource();
+      root.updateClusterResource(clusterResource,
+          new ResourceLimits(clusterResource));
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
   private void addNode(RMNode nodeManager) {
     try {
       writeLock.lock();

+ 70 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java

@@ -413,7 +413,7 @@ public class TestCapacitySchedulerNodeLabelUpdate {
     rm.close();
   }
 
-  @Test(timeout = 3000000)
+  @Test(timeout = 300000)
   public void testMoveApplicationWithLabel() throws Exception {
     // set node -> label
     mgr.addToCluserNodeLabelsWithDefaultExclusivity(
@@ -589,7 +589,49 @@ public class TestCapacitySchedulerNodeLabelUpdate {
     rm.close();
   }
 
-  @Test (timeout = 60000)
+  @Test
+  public void testAMResourceLimitNodeUpdatePartition() throws Exception {
+    conf.setInt("yarn.scheduler.minimum-allocation-mb", 64);
+    // inject node label manager
+    MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+    rm.getRMContext().setNodeLabelManager(mgr);
+    rm.start();
+    rm.registerNode("h1:1234", 6400);
+    mgr.addToCluserNodeLabelsWithDefaultExclusivity(
+        ImmutableSet.of("x", "y", "z"));
+
+    // .1 percentage of 6400 will be for am
+    checkAMResourceLimit(rm, "a", 640, "");
+    checkAMResourceLimit(rm, "a", 0, "x");
+    checkAMResourceLimit(rm, "a", 0, "y");
+    checkAMResourceLimit(rm, "a", 0, "z");
+
+    mgr.replaceLabelsOnNode(
+        ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+    rm.drainEvents();
+
+    checkAMResourceLimit(rm, "a", 640, "x");
+    checkAMResourceLimit(rm, "a", 0, "y");
+    checkAMResourceLimit(rm, "a", 0, "z");
+    checkAMResourceLimit(rm, "a", 0, "");
+
+    // Switch
+    mgr.replaceLabelsOnNode(
+        ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y")));
+    rm.drainEvents();
+
+    checkAMResourceLimit(rm, "a", 0, "x");
+    checkAMResourceLimit(rm, "a", 640, "y");
+    checkAMResourceLimit(rm, "a", 0, "z");
+    checkAMResourceLimit(rm, "a", 0, "");
+  }
+
+  @Test(timeout = 60000)
   public void testAMResourceUsageWhenNodeUpdatesPartition()
       throws Exception {
     // set node -> label
@@ -638,8 +680,8 @@ public class TestCapacitySchedulerNodeLabelUpdate {
     FiCaSchedulerApp app = cs.getApplicationAttempt(am1.getApplicationAttemptId());
 
     // change h1's label to z
-    cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
-        toSet("z"))));
+    cs.handle(new NodeLabelsUpdateSchedulerEvent(
+        ImmutableMap.of(nm1.getNodeId(), toSet("z"))));
 
     // Now the resources also should change from x to z. Verify AM and normal
     // used resource are successfully changed.
@@ -677,4 +719,28 @@ public class TestCapacitySchedulerNodeLabelUpdate {
 
     rm.close();
   }
+
+  private void checkAMResourceLimit(MockRM rm, String queuename, int memory,
+      String label) throws InterruptedException {
+    Assert.assertEquals(memory,
+        waitForResourceUpdate(rm, queuename, memory, label, 3000L));
+  }
+
+  private long waitForResourceUpdate(MockRM rm, String queuename, long memory,
+      String label, long timeout) throws InterruptedException {
+    long start = System.currentTimeMillis();
+    long memorySize = 0;
+    while (System.currentTimeMillis() - start < timeout) {
+      CapacityScheduler scheduler =
+          (CapacityScheduler) rm.getResourceScheduler();
+      CSQueue queue = scheduler.getQueue(queuename);
+      memorySize =
+          queue.getQueueResourceUsage().getAMLimit(label).getMemorySize();
+      if (memory == memorySize) {
+        return memorySize;
+      }
+      Thread.sleep(100);
+    }
+    return memorySize;
+  }
 }