瀏覽代碼

YARN-3354. Add node label expression in ContainerTokenIdentifier to support RM recovery. Contributed by Wangda Tan

Jian He 10 年之前
父節點
當前提交
1b89a3e173
共有 17 個文件被更改,包括 408 次插入27 次删除
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 19 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
  3. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto
  4. 21 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java
  5. 20 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java
  6. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
  7. 4 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
  8. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
  9. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
  10. 22 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
  11. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
  12. 10 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
  13. 4 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
  14. 4 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java
  15. 10 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
  16. 3 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
  17. 282 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestWorkPreservingRMRestartForNodeLabel.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -81,6 +81,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3326. Support RESTful API for getLabelsToNodes. (Naganarasimha G R
     via ozawa)
 
+    YARN-3354. Add node label expression in ContainerTokenIdentifier to support
+    RM recovery. (Wangda Tan via jianhe)
+
   IMPROVEMENTS
 
     YARN-1880. Cleanup TestApplicationClientProtocolOnHA

+ 19 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java

@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto;
 
 import com.google.protobuf.TextFormat;
@@ -64,13 +65,14 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
       String hostName, String appSubmitter, Resource r, long expiryTimeStamp,
       int masterKeyId, long rmIdentifier, Priority priority, long creationTime) {
     this(containerID, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId,
-        rmIdentifier, priority, creationTime, null);
+        rmIdentifier, priority, creationTime, null,
+        CommonNodeLabelsManager.NO_LABEL);
   }
 
   public ContainerTokenIdentifier(ContainerId containerID, String hostName,
       String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId,
       long rmIdentifier, Priority priority, long creationTime,
-      LogAggregationContext logAggregationContext) {
+      LogAggregationContext logAggregationContext, String nodeLabelExpression) {
     ContainerTokenIdentifierProto.Builder builder = 
         ContainerTokenIdentifierProto.newBuilder();
     if (containerID != null) {
@@ -93,6 +95,11 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
       builder.setLogAggregationContext(
           ((LogAggregationContextPBImpl)logAggregationContext).getProto());
     }
+    
+    if (nodeLabelExpression != null) {
+      builder.setNodeLabelExpression(nodeLabelExpression);
+    }
+    
     proto = builder.build();
   }
 
@@ -186,6 +193,16 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
     return UserGroupInformation.createRemoteUser(
         containerId);
   }
+  
+  /**
+   * Get the node-label-expression in the original ResourceRequest
+   */
+  public String getNodeLabelExpression() {
+    if (proto.hasNodeLabelExpression()) {
+      return proto.getNodeLabelExpression();
+    }
+    return CommonNodeLabelsManager.NO_LABEL;
+  }
 
   // TODO: Needed?
   @InterfaceAudience.Private

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto

@@ -49,6 +49,7 @@ message ContainerTokenIdentifierProto {
   optional PriorityProto priority = 8;
   optional int64 creationTime = 9;
   optional LogAggregationContextProto logAggregationContext = 10;
+  optional string nodeLabelExpression = 11;
 }
 
 message ClientToAMTokenIdentifierProto {

+ 21 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.util.Records;
 
 /**
@@ -31,11 +32,21 @@ import org.apache.hadoop.yarn.util.Records;
  * inside YARN and by end-users.
  */
 public abstract class NMContainerStatus {
-
+  
+  // Used by tests only
   public static NMContainerStatus newInstance(ContainerId containerId,
       ContainerState containerState, Resource allocatedResource,
       String diagnostics, int containerExitStatus, Priority priority,
       long creationTime) {
+    return newInstance(containerId, containerState, allocatedResource,
+        diagnostics, containerExitStatus, priority, creationTime,
+        CommonNodeLabelsManager.NO_LABEL);
+  }
+
+  public static NMContainerStatus newInstance(ContainerId containerId,
+      ContainerState containerState, Resource allocatedResource,
+      String diagnostics, int containerExitStatus, Priority priority,
+      long creationTime, String nodeLabelExpression) {
     NMContainerStatus status =
         Records.newRecord(NMContainerStatus.class);
     status.setContainerId(containerId);
@@ -45,6 +56,7 @@ public abstract class NMContainerStatus {
     status.setContainerExitStatus(containerExitStatus);
     status.setPriority(priority);
     status.setCreationTime(creationTime);
+    status.setNodeLabelExpression(nodeLabelExpression);
     return status;
   }
 
@@ -105,4 +117,12 @@ public abstract class NMContainerStatus {
   public abstract long getCreationTime();
 
   public abstract void setCreationTime(long creationTime);
+  
+  /**
+   * Get the node-label-expression in the original ResourceRequest
+   */
+  public abstract String getNodeLabelExpression();
+
+  public abstract void setNodeLabelExpression(
+      String nodeLabelExpression);
 }

+ 20 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
@@ -207,6 +208,25 @@ public class NMContainerStatusPBImpl extends NMContainerStatus {
     maybeInitBuilder();
     builder.setCreationTime(creationTime);
   }
+  
+  @Override
+  public String getNodeLabelExpression() {
+    NMContainerStatusProtoOrBuilder p = viaProto ? proto : builder;
+    if (p.hasNodeLabelExpression()) {
+      return p.getNodeLabelExpression();
+    }
+    return CommonNodeLabelsManager.NO_LABEL;
+  }
+
+  @Override
+  public void setNodeLabelExpression(String nodeLabelExpression) {
+    maybeInitBuilder();
+    if (nodeLabelExpression == null) {
+      builder.clearNodeLabelExpression();
+      return;
+    }
+    builder.setNodeLabelExpression(nodeLabelExpression);
+  }
 
   private void mergeLocalToBuilder() {
     if (this.containerId != null
@@ -274,5 +294,4 @@ public class NMContainerStatusPBImpl extends NMContainerStatus {
   private PriorityProto convertToProtoFormat(Priority t) {
     return ((PriorityPBImpl)t).getProto();
   }
-
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto

@@ -92,6 +92,7 @@ message NMContainerStatusProto {
   optional string diagnostics = 5 [default = "N/A"];
   optional int32 container_exit_status = 6;
   optional int64 creation_time = 7;
+  optional string nodeLabelExpression = 8;
 }
 
 message SCMUploaderNotifyRequestProto {

+ 4 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java

@@ -432,9 +432,10 @@ public class ContainerImpl implements Container {
     this.readLock.lock();
     try {
       return NMContainerStatus.newInstance(this.containerId, getCurrentState(),
-        getResource(), diagnostics.toString(), exitCode,
-        containerTokenIdentifier.getPriority(),
-        containerTokenIdentifier.getCreationTime());
+          getResource(), diagnostics.toString(), exitCode,
+          containerTokenIdentifier.getPriority(),
+          containerTokenIdentifier.getCreationTime(),
+          containerTokenIdentifier.getNodeLabelExpression());
     } finally {
       this.readLock.unlock();
     }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java

@@ -809,7 +809,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
     ContainerTokenIdentifier containerTokenIdentifier =
         new ContainerTokenIdentifier(cId, nodeId.toString(), user, r,
           System.currentTimeMillis() + 100000L, 123, rmIdentifier,
-          Priority.newInstance(0), 0, logAggregationContext);
+          Priority.newInstance(0), 0, logAggregationContext, null);
     Token containerToken =
         BuilderUtils
           .newContainerToken(nodeId, containerTokenSecretManager

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java

@@ -80,4 +80,6 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
   List<ResourceRequest> getResourceRequests();
 
   String getNodeHttpAddress();
+  
+  String getNodeLabelExpression();
 }

+ 22 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java

@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
@@ -153,6 +154,7 @@ public class RMContainerImpl implements RMContainer {
   private final EventHandler eventHandler;
   private final ContainerAllocationExpirer containerAllocationExpirer;
   private final String user;
+  private final String nodeLabelExpression;
 
   private Resource reservedResource;
   private NodeId reservedNode;
@@ -162,17 +164,24 @@ public class RMContainerImpl implements RMContainer {
   private ContainerStatus finishedStatus;
   private boolean isAMContainer;
   private List<ResourceRequest> resourceRequests;
-
+  
   public RMContainerImpl(Container container,
       ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
       RMContext rmContext) {
     this(container, appAttemptId, nodeId, user, rmContext, System
-      .currentTimeMillis());
+        .currentTimeMillis(), "");
+  }
+
+  public RMContainerImpl(Container container,
+      ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
+      RMContext rmContext, String nodeLabelExpression) {
+    this(container, appAttemptId, nodeId, user, rmContext, System
+      .currentTimeMillis(), nodeLabelExpression);
   }
 
   public RMContainerImpl(Container container,
-      ApplicationAttemptId appAttemptId, NodeId nodeId,
-      String user, RMContext rmContext, long creationTime) {
+      ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
+      RMContext rmContext, long creationTime, String nodeLabelExpression) {
     this.stateMachine = stateMachineFactory.make(this);
     this.containerId = container.getId();
     this.nodeId = nodeId;
@@ -185,6 +194,7 @@ public class RMContainerImpl implements RMContainer {
     this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer();
     this.isAMContainer = false;
     this.resourceRequests = null;
+    this.nodeLabelExpression = nodeLabelExpression;
 
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     this.readLock = lock.readLock();
@@ -597,4 +607,12 @@ public class RMContainerImpl implements RMContainer {
       readLock.unlock();
     }
   }
+
+  @Override
+  public String getNodeLabelExpression() {
+    if (nodeLabelExpression == null) {
+      return RMNodeLabelsManager.NO_LABEL;
+    }
+    return nodeLabelExpression;
+  }
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -408,7 +408,7 @@ public abstract class AbstractYarnScheduler
     RMContainer rmContainer =
         new RMContainerImpl(container, attemptId, node.getNodeID(),
           applications.get(attemptId.getApplicationId()).getUser(), rmContext,
-          status.getCreationTime());
+          status.getCreationTime(), status.getNodeLabelExpression());
     return rmContainer;
   }
 

+ 10 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java

@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Scheduli
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Multiset;
@@ -466,9 +467,10 @@ public class SchedulerApplicationAttempt {
       try {
         // create container token and NMToken altogether.
         container.setContainerToken(rmContext.getContainerTokenSecretManager()
-          .createContainerToken(container.getId(), container.getNodeId(),
-            getUser(), container.getResource(), container.getPriority(),
-            rmContainer.getCreationTime(), this.logAggregationContext));
+            .createContainerToken(container.getId(), container.getNodeId(),
+                getUser(), container.getResource(), container.getPriority(),
+                rmContainer.getCreationTime(), this.logAggregationContext,
+                rmContainer.getNodeLabelExpression()));
         NMToken nmToken =
             rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
               getApplicationAttemptId(), container);
@@ -703,4 +705,9 @@ public class SchedulerApplicationAttempt {
         this.attemptResourceUsage, nodePartition, cluster,
         schedulingMode);
   }
+  
+  @VisibleForTesting
+  public ResourceUsage getAppAttemptResourceUsage() {
+    return this.attemptResourceUsage;
+  }
 }

+ 4 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java

@@ -146,9 +146,10 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     }
     
     // Create RMContainer
-    RMContainer rmContainer = new RMContainerImpl(container, this
-        .getApplicationAttemptId(), node.getNodeID(),
-        appSchedulingInfo.getUser(), this.rmContext);
+    RMContainer rmContainer =
+        new RMContainerImpl(container, this.getApplicationAttemptId(),
+            node.getNodeID(), appSchedulingInfo.getUser(), this.rmContext,
+            request.getNodeLabelExpression());
 
     // Add it to allContainers list.
     newlyAllocatedContainers.add(rmContainer);

+ 4 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java

@@ -179,7 +179,7 @@ public class RMContainerTokenSecretManager extends
       String appSubmitter, Resource capability, Priority priority,
       long createTime) {
     return createContainerToken(containerId, nodeId, appSubmitter, capability,
-      priority, createTime, null);
+      priority, createTime, null, null);
   }
 
   /**
@@ -196,7 +196,8 @@ public class RMContainerTokenSecretManager extends
    */
   public Token createContainerToken(ContainerId containerId, NodeId nodeId,
       String appSubmitter, Resource capability, Priority priority,
-      long createTime, LogAggregationContext logAggregationContext) {
+      long createTime, LogAggregationContext logAggregationContext,
+      String nodeLabelExpression) {
     byte[] password;
     ContainerTokenIdentifier tokenIdentifier;
     long expiryTimeStamp =
@@ -210,7 +211,7 @@ public class RMContainerTokenSecretManager extends
             appSubmitter, capability, expiryTimeStamp, this.currentMasterKey
               .getMasterKey().getKeyId(),
             ResourceManager.getClusterTimeStamp(), priority, createTime,
-            logAggregationContext);
+            logAggregationContext, nodeLabelExpression);
       password = this.createPassword(tokenIdentifier);
 
     } finally {

+ 10 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java

@@ -1984,14 +1984,21 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
       }
     }
   }
-
+  
   public static NMContainerStatus createNMContainerStatus(
       ApplicationAttemptId appAttemptId, int id, ContainerState containerState) {
+    return createNMContainerStatus(appAttemptId, id, containerState,
+        RMNodeLabelsManager.NO_LABEL);
+  }
+
+  public static NMContainerStatus createNMContainerStatus(
+      ApplicationAttemptId appAttemptId, int id, ContainerState containerState,
+      String nodeLabelExpression) {
     ContainerId containerId = ContainerId.newContainerId(appAttemptId, id);
     NMContainerStatus containerReport =
         NMContainerStatus.newInstance(containerId, containerState,
-          Resource.newInstance(1024, 1), "recover container", 0,
-          Priority.newInstance(0), 0);
+            Resource.newInstance(1024, 1), "recover container", 0,
+            Priority.newInstance(0), 0, nodeLabelExpression);
     return containerReport;
   }
 

+ 3 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java

@@ -293,10 +293,11 @@ public class TestContainerAllocation {
         public Token createContainerToken(ContainerId containerId,
             NodeId nodeId, String appSubmitter, Resource capability,
             Priority priority, long createTime,
-            LogAggregationContext logAggregationContext) {
+            LogAggregationContext logAggregationContext, String nodeLabelExp) {
           numRetries++;
           return super.createContainerToken(containerId, nodeId, appSubmitter,
-            capability, priority, createTime, logAggregationContext);
+              capability, priority, createTime, logAggregationContext,
+              nodeLabelExp);
         }
       };
     }

+ 282 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestWorkPreservingRMRestartForNodeLabel.java

@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+public class TestWorkPreservingRMRestartForNodeLabel {
+  private Configuration conf;
+  private static final int GB = 1024; // 1024 MB
+  
+  RMNodeLabelsManager mgr;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+      ResourceScheduler.class);
+    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
+    conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+  }
+  
+  @SuppressWarnings("unchecked")
+  private <E> Set<E> toSet(E... elements) {
+    Set<E> set = Sets.newHashSet(elements);
+    return set;
+  }
+  
+  private void checkRMContainerLabelExpression(ContainerId containerId,
+      MockRM rm, String labelExpression) {
+    RMContainer container =
+        rm.getRMContext().getScheduler().getRMContainer(containerId);
+    Assert.assertNotNull("Cannot find RMContainer=" + containerId, container);
+    Assert.assertEquals(labelExpression,
+        container.getNodeLabelExpression());
+  }
+  
+  @SuppressWarnings("rawtypes")
+  public static void waitForNumContainersToRecover(int num, MockRM rm,
+      ApplicationAttemptId attemptId) throws Exception {
+    AbstractYarnScheduler scheduler =
+        (AbstractYarnScheduler) rm.getResourceScheduler();
+    SchedulerApplicationAttempt attempt =
+        scheduler.getApplicationAttempt(attemptId);
+    while (attempt == null) {
+      System.out.println("Wait for scheduler attempt " + attemptId
+          + " to be created");
+      Thread.sleep(200);
+      attempt = scheduler.getApplicationAttempt(attemptId);
+    }
+    while (attempt.getLiveContainers().size() < num) {
+      System.out.println("Wait for " + num
+          + " containers to recover. currently: "
+          + attempt.getLiveContainers().size());
+      Thread.sleep(200);
+    }
+  }
+  
+  private void checkAppResourceUsage(String partition, ApplicationId appId,
+      MockRM rm, int expectedMemUsage) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    FiCaSchedulerApp app =
+        cs.getSchedulerApplications().get(appId).getCurrentAppAttempt();
+    Assert.assertEquals(expectedMemUsage, app.getAppAttemptResourceUsage()
+        .getUsed(partition).getMemory());
+  }
+  
+  private void checkQueueResourceUsage(String partition, String queueName, MockRM rm, int expectedMemUsage) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    CSQueue queue = cs.getQueue(queueName);
+    Assert.assertEquals(expectedMemUsage, queue.getQueueResourceUsage()
+        .getUsed(partition).getMemory());
+  }
+
+  @Test
+  public void testWorkPreservingRestartForNodeLabel() throws Exception {
+    // This test is pretty much similar to testContainerAllocateWithLabel.
+    // Difference is, this test doesn't specify label expression in ResourceRequest,
+    // instead, it uses default queue label expression
+
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
+        NodeId.newInstance("h2", 0), toSet("y")));
+
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    
+    conf = TestUtils.getConfigurationWithDefaultQueueLabels(conf);
+    
+    // inject node label manager
+    MockRM rm1 =
+        new MockRM(conf,
+            memStore) {
+          @Override
+          public RMNodeLabelsManager createNodeLabelManager() {
+            return mgr;
+          }
+        };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
+    MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
+    MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
+    
+    ContainerId containerId;
+
+    // launch an app to queue a1 (label = x), and check all container will
+    // be allocated in h1
+    RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // request a container.
+    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+    containerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    Assert.assertTrue(rm1.waitForState(nm1, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    checkRMContainerLabelExpression(ContainerId.newContainerId(
+        am1.getApplicationAttemptId(), 1), rm1, "x");
+    checkRMContainerLabelExpression(ContainerId.newContainerId(
+        am1.getApplicationAttemptId(), 2), rm1, "x");
+
+    // launch an app to queue b1 (label = y), and check all container will
+    // be allocated in h2
+    RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    // request a container.
+    am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+    containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
+    Assert.assertTrue(rm1.waitForState(nm2, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    checkRMContainerLabelExpression(ContainerId.newContainerId(
+        am2.getApplicationAttemptId(), 1), rm1, "y");
+    checkRMContainerLabelExpression(ContainerId.newContainerId(
+        am2.getApplicationAttemptId(), 2), rm1, "y");
+    
+    // launch an app to queue c1 (label = ""), and check all container will
+    // be allocated in h3
+    RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
+    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
+
+    // request a container.
+    am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+    containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
+    Assert.assertTrue(rm1.waitForState(nm3, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    checkRMContainerLabelExpression(ContainerId.newContainerId(
+        am3.getApplicationAttemptId(), 1), rm1, "");
+    checkRMContainerLabelExpression(ContainerId.newContainerId(
+        am3.getApplicationAttemptId(), 2), rm1, "");
+    
+    // Re-start RM
+    mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
+        NodeId.newInstance("h2", 0), toSet("y")));
+    MockRM rm2 =
+        new MockRM(conf,
+            memStore) {
+          @Override
+          public RMNodeLabelsManager createNodeLabelManager() {
+            return mgr;
+          }
+        };
+    rm2.start();
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    nm2.setResourceTrackerService(rm2.getResourceTrackerService());
+    nm3.setResourceTrackerService(rm2.getResourceTrackerService());
+    
+    // recover app
+    NMContainerStatus app1c1 =
+        TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 1,
+          ContainerState.RUNNING, "x");
+    NMContainerStatus app1c2 =
+        TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 2,
+          ContainerState.RUNNING, "x");
+    nm1.registerNode(Arrays.asList(app1c1, app1c2), null);
+    waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId());
+    checkRMContainerLabelExpression(ContainerId.newContainerId(
+        am1.getApplicationAttemptId(), 1), rm1, "x");
+    checkRMContainerLabelExpression(ContainerId.newContainerId(
+        am1.getApplicationAttemptId(), 2), rm1, "x");
+    
+    NMContainerStatus app2c1 =
+        TestRMRestart.createNMContainerStatus(am2.getApplicationAttemptId(), 1,
+          ContainerState.RUNNING, "y");
+    NMContainerStatus app2c2 =
+        TestRMRestart.createNMContainerStatus(am2.getApplicationAttemptId(), 2,
+          ContainerState.RUNNING, "y");
+    nm2.registerNode(Arrays.asList(app2c1, app2c2), null);
+    waitForNumContainersToRecover(2, rm2, am2.getApplicationAttemptId());
+    checkRMContainerLabelExpression(ContainerId.newContainerId(
+        am2.getApplicationAttemptId(), 1), rm1, "y");
+    checkRMContainerLabelExpression(ContainerId.newContainerId(
+        am2.getApplicationAttemptId(), 2), rm1, "y");
+    
+    NMContainerStatus app3c1 =
+        TestRMRestart.createNMContainerStatus(am3.getApplicationAttemptId(), 1,
+          ContainerState.RUNNING, "");
+    NMContainerStatus app3c2 =
+        TestRMRestart.createNMContainerStatus(am3.getApplicationAttemptId(), 2,
+          ContainerState.RUNNING, "");
+    nm3.registerNode(Arrays.asList(app3c1, app3c2), null);
+    waitForNumContainersToRecover(2, rm2, am3.getApplicationAttemptId());
+    checkRMContainerLabelExpression(ContainerId.newContainerId(
+        am3.getApplicationAttemptId(), 1), rm1, "");
+    checkRMContainerLabelExpression(ContainerId.newContainerId(
+        am3.getApplicationAttemptId(), 2), rm1, "");
+    
+    // Check recovered resource usage
+    checkAppResourceUsage("x", app1.getApplicationId(), rm1, 2 * GB);
+    checkAppResourceUsage("y", app2.getApplicationId(), rm1, 2 * GB);
+    checkAppResourceUsage("", app3.getApplicationId(), rm1, 2 * GB);
+    checkQueueResourceUsage("x", "a1", rm1, 2 * GB);
+    checkQueueResourceUsage("y", "b1", rm1, 2 * GB);
+    checkQueueResourceUsage("", "c1", rm1, 2 * GB);
+    checkQueueResourceUsage("x", "a", rm1, 2 * GB);
+    checkQueueResourceUsage("y", "b", rm1, 2 * GB);
+    checkQueueResourceUsage("", "c", rm1, 2 * GB);
+    checkQueueResourceUsage("x", "root", rm1, 2 * GB);
+    checkQueueResourceUsage("y", "root", rm1, 2 * GB);
+    checkQueueResourceUsage("", "root", rm1, 2 * GB);
+
+
+    rm1.close();
+    rm2.close();
+  }
+}