Преглед изворни кода

YARN-8193. YARN RM hangs abruptly (stops allocating resources) when running successive applications. (Zian Chen via wangda)

Change-Id: Ia83dd2499ee9000b9e09ae5a932f21a13c0ddee6
Wangda Tan пре 7 година
родитељ
комит
2a0fa50f9d

+ 31 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java

@@ -179,11 +179,22 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     // This is to make sure non-partitioned-resource-request will prefer
     // to be allocated to non-partitioned nodes
     int missedNonPartitionedRequestSchedulingOpportunity = 0;
+    AppPlacementAllocator appPlacementAllocator =
+        appInfo.getAppPlacementAllocator(schedulerKey);
+    if (null == appPlacementAllocator){
+      // This is possible when #pending resource decreased by a different
+      // thread.
+      ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+          activitiesManager, node, application, priority,
+          ActivityDiagnosticConstant.PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST);
+      return ContainerAllocation.PRIORITY_SKIPPED;
+    }
+    String requestPartition =
+        appPlacementAllocator.getPrimaryRequestedNodePartition();
+
     // Only do this when request associated with given scheduler key accepts
     // NO_LABEL under RESPECT_EXCLUSIVITY mode
-    if (StringUtils.equals(RMNodeLabelsManager.NO_LABEL,
-        appInfo.getAppPlacementAllocator(schedulerKey)
-            .getPrimaryRequestedNodePartition())) {
+    if (StringUtils.equals(RMNodeLabelsManager.NO_LABEL, requestPartition)) {
       missedNonPartitionedRequestSchedulingOpportunity =
           application.addMissedNonPartitionedRequestSchedulingOpportunity(
               schedulerKey);
@@ -261,12 +272,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     return result;
   }
   
-  public float getLocalityWaitFactor(
-      SchedulerRequestKey schedulerKey, int clusterNodes) {
+  public float getLocalityWaitFactor(int uniqAsks, int clusterNodes) {
     // Estimate: Required unique resources (i.e. hosts + racks)
-    int requiredResources = Math.max(
-        application.getAppPlacementAllocator(schedulerKey)
-            .getUniqueLocationAsks() - 1, 0);
+    int requiredResources = Math.max(uniqAsks - 1, 0);
     
     // waitFactor can't be more than '1' 
     // i.e. no point skipping more than clustersize opportunities
@@ -296,10 +304,16 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       if (rmContext.getScheduler().getNumClusterNodes() == 0) {
         return false;
       }
+
+      int uniqLocationAsks = 0;
+      AppPlacementAllocator appPlacementAllocator =
+          application.getAppPlacementAllocator(schedulerKey);
+      if (appPlacementAllocator != null) {
+        uniqLocationAsks = appPlacementAllocator.getUniqueLocationAsks();
+      }
       // If we have only ANY requests for this schedulerKey, we should not
       // delay its scheduling.
-      if (application.getAppPlacementAllocator(schedulerKey)
-          .getUniqueLocationAsks() == 1) {
+      if (uniqLocationAsks == 1) {
         return true;
       }
 
@@ -313,7 +327,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       } else {
         long requiredContainers =
             application.getOutstandingAsksCount(schedulerKey);
-        float localityWaitFactor = getLocalityWaitFactor(schedulerKey,
+        float localityWaitFactor = getLocalityWaitFactor(uniqLocationAsks,
             rmContext.getScheduler().getNumClusterNodes());
         // Cap the delay by the number of nodes in the cluster.
         return (Math.min(rmContext.getScheduler().getNumClusterNodes(),
@@ -806,6 +820,12 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
         application.getAppSchedulingInfo().getAppPlacementAllocator(
             schedulerKey);
 
+    // This could be null when #pending request decreased by another thread.
+    if (schedulingPS == null) {
+      return new ContainerAllocation(reservedContainer, null,
+          AllocationState.QUEUE_SKIPPED);
+    }
+
     result = ContainerAllocation.PRIORITY_SKIPPED;
 
     Iterator<FiCaSchedulerNode> iter = schedulingPS.getPreferredNodeIterator(