Browse Source

YARN-8925. Updating distributed node attributes only when necessary. Contributed by Tao Yang.

Weiwei Yang 6 years ago
parent
commit
7deef08eb8
16 changed files with 1479 additions and 112 deletions
  1. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  2. 50 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java
  3. 9 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  4. 71 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestNodeLabelUtil.java
  5. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
  6. 16 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
  7. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java
  8. 15 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
  9. 53 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
  10. 15 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java
  11. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
  12. 254 77
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
  13. 439 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForAttributes.java
  14. 81 22
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
  15. 6 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java
  16. 451 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -3596,6 +3596,12 @@ public class YarnConfiguration extends Configuration {
   public static final long DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL =
       2 * 60 * 1000;
 
+  public static final String NM_NODE_ATTRIBUTES_RESYNC_INTERVAL =
+      NM_NODE_ATTRIBUTES_PREFIX + "resync-interval-ms";
+
+  public static final long DEFAULT_NM_NODE_ATTRIBUTES_RESYNC_INTERVAL =
+      2 * 60 * 1000;
+
   // If -1 is configured then no timer task should be created
   public static final String NM_NODE_LABELS_PROVIDER_FETCH_INTERVAL_MS =
       NM_NODE_LABELS_PROVIDER_PREFIX + "fetch-interval-ms";

+ 50 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelUtil.java

@@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.NodeAttribute;
 import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
 
 import java.io.IOException;
+import java.util.Objects;
 import java.util.Set;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -40,6 +41,8 @@ public final class NodeLabelUtil {
       Pattern.compile("^[0-9a-zA-Z][0-9a-zA-Z-_\\.]*");
   private static final Pattern ATTRIBUTE_VALUE_PATTERN =
       Pattern.compile("^[0-9a-zA-Z][0-9a-zA-Z-_.]*");
+  private static final Pattern ATTRIBUTE_NAME_PATTERN =
+      Pattern.compile("^[0-9a-zA-Z][0-9a-zA-Z-_]*");
 
   public static void checkAndThrowLabelName(String label) throws IOException {
     if (label == null || label.isEmpty() || label.length() > MAX_LABEL_LENGTH) {
@@ -57,6 +60,25 @@ public final class NodeLabelUtil {
     }
   }
 
+  public static void checkAndThrowAttributeName(String attributeName)
+      throws IOException {
+    if (attributeName == null || attributeName.isEmpty()
+        || attributeName.length() > MAX_LABEL_LENGTH) {
+      throw new IOException(
+          "attribute name added is empty or exceeds " + MAX_LABEL_LENGTH
+              + " character(s)");
+    }
+    attributeName = attributeName.trim();
+
+    boolean match = ATTRIBUTE_NAME_PATTERN.matcher(attributeName).matches();
+
+    if (!match) {
+      throw new IOException("attribute name should only contains "
+          + "{0-9, a-z, A-Z, -, _} and should not started with {-,_}"
+          + ", now it is= " + attributeName);
+    }
+  }
+
   public static void checkAndThrowAttributeValue(String value)
       throws IOException {
     if (value == null) {
@@ -129,7 +151,9 @@ public final class NodeLabelUtil {
         // Verify attribute prefix format.
         checkAndThrowAttributePrefix(prefix);
         // Verify attribute name format.
-        checkAndThrowLabelName(attributeKey.getAttributeName());
+        checkAndThrowAttributeName(attributeKey.getAttributeName());
+        // Verify attribute value format.
+        checkAndThrowAttributeValue(nodeAttribute.getAttributeValue());
       }
     }
   }
@@ -152,4 +176,29 @@ public final class NodeLabelUtil {
             .equals(nodeAttribute.getAttributeKey().getAttributePrefix()))
         .collect(Collectors.toSet());
   }
+
+  /**
+   * Are these two input node attributes the same.
+   * @return true if they are the same
+   */
+  public static boolean isNodeAttributesEquals(
+      Set<NodeAttribute> leftNodeAttributes,
+      Set<NodeAttribute> rightNodeAttributes) {
+    if (leftNodeAttributes == null && rightNodeAttributes == null) {
+      return true;
+    } else if (leftNodeAttributes == null || rightNodeAttributes == null
+        || leftNodeAttributes.size() != rightNodeAttributes.size()) {
+      return false;
+    }
+    return leftNodeAttributes.stream()
+        .allMatch(e -> isNodeAttributeIncludes(rightNodeAttributes, e));
+  }
+
+  private static boolean isNodeAttributeIncludes(
+      Set<NodeAttribute> nodeAttributes, NodeAttribute checkNodeAttribute) {
+    return nodeAttributes.stream().anyMatch(
+        e -> e.equals(checkNodeAttribute) && Objects
+            .equals(e.getAttributeValue(),
+                checkNodeAttribute.getAttributeValue()));
+  }
 }

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -2976,6 +2976,15 @@
     <value></value>
   </property>
 
+  <property>
+    <description>
+      Interval at which NM syncs its node attributes with RM. NM will send its loaded
+      attributes every x intervals configured, along with heartbeat to RM.
+    </description>
+    <name>yarn.nodemanager.node-attributes.resync-interval-ms</name>
+    <value>120000</value>
+  </property>
+
   <property>
     <description>
     Timeout in seconds for YARN node graceful decommission.

+ 71 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestNodeLabelUtil.java

@@ -18,6 +18,11 @@
 package org.apache.hadoop.yarn.nodelabels;
 
 import static org.junit.Assert.fail;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeType;
+import org.junit.Assert;
 import org.junit.Test;
 
 /**
@@ -48,4 +53,70 @@ public class TestNodeLabelUtil {
       }
     }
   }
+
+  @Test
+  public void testIsNodeAttributesEquals() {
+    NodeAttribute nodeAttributeCK1V1 = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "K1",
+            NodeAttributeType.STRING, "V1");
+    NodeAttribute nodeAttributeCK1V1Copy = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_CENTRALIZED, "K1",
+            NodeAttributeType.STRING, "V1");
+    NodeAttribute nodeAttributeDK1V1 = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "K1",
+            NodeAttributeType.STRING, "V1");
+    NodeAttribute nodeAttributeDK1V1Copy = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "K1",
+            NodeAttributeType.STRING, "V1");
+    NodeAttribute nodeAttributeDK2V1 = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "K2",
+            NodeAttributeType.STRING, "V1");
+    NodeAttribute nodeAttributeDK2V2 = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "K2",
+            NodeAttributeType.STRING, "V2");
+    /*
+     * equals if set size equals and items are all the same
+     */
+    Assert.assertTrue(NodeLabelUtil.isNodeAttributesEquals(null, null));
+    Assert.assertTrue(NodeLabelUtil
+        .isNodeAttributesEquals(ImmutableSet.of(), ImmutableSet.of()));
+    Assert.assertTrue(NodeLabelUtil
+        .isNodeAttributesEquals(ImmutableSet.of(nodeAttributeCK1V1),
+            ImmutableSet.of(nodeAttributeCK1V1Copy)));
+    Assert.assertTrue(NodeLabelUtil
+        .isNodeAttributesEquals(ImmutableSet.of(nodeAttributeDK1V1),
+            ImmutableSet.of(nodeAttributeDK1V1Copy)));
+    Assert.assertTrue(NodeLabelUtil.isNodeAttributesEquals(
+        ImmutableSet.of(nodeAttributeCK1V1, nodeAttributeDK1V1),
+        ImmutableSet.of(nodeAttributeCK1V1Copy, nodeAttributeDK1V1Copy)));
+    /*
+     * not equals if set size not equals or items are different
+     */
+    Assert.assertFalse(
+        NodeLabelUtil.isNodeAttributesEquals(null, ImmutableSet.of()));
+    Assert.assertFalse(
+        NodeLabelUtil.isNodeAttributesEquals(ImmutableSet.of(), null));
+    // different attribute prefix
+    Assert.assertFalse(NodeLabelUtil
+        .isNodeAttributesEquals(ImmutableSet.of(nodeAttributeCK1V1),
+            ImmutableSet.of(nodeAttributeDK1V1)));
+    // different attribute name
+    Assert.assertFalse(NodeLabelUtil
+        .isNodeAttributesEquals(ImmutableSet.of(nodeAttributeDK1V1),
+            ImmutableSet.of(nodeAttributeDK2V1)));
+    // different attribute value
+    Assert.assertFalse(NodeLabelUtil
+        .isNodeAttributesEquals(ImmutableSet.of(nodeAttributeDK2V1),
+            ImmutableSet.of(nodeAttributeDK2V2)));
+    // different set
+    Assert.assertFalse(NodeLabelUtil
+        .isNodeAttributesEquals(ImmutableSet.of(nodeAttributeCK1V1),
+            ImmutableSet.of()));
+    Assert.assertFalse(NodeLabelUtil
+        .isNodeAttributesEquals(ImmutableSet.of(nodeAttributeCK1V1),
+            ImmutableSet.of(nodeAttributeCK1V1, nodeAttributeDK1V1)));
+    Assert.assertFalse(NodeLabelUtil.isNodeAttributesEquals(
+        ImmutableSet.of(nodeAttributeCK1V1, nodeAttributeDK1V1),
+        ImmutableSet.of(nodeAttributeDK1V1)));
+  }
 }

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java

@@ -118,4 +118,9 @@ public abstract class NodeHeartbeatResponse {
 
   public abstract void addAllContainersToDecrease(
       Collection<Container> containersToDecrease);
+
+  public abstract boolean getAreNodeAttributesAcceptedByRM();
+
+  public abstract void setAreNodeAttributesAcceptedByRM(
+      boolean areNodeAttributesAcceptedByRM);
 }

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java

@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -50,6 +51,16 @@ public abstract class RegisterNodeManagerRequest {
       List<NMContainerStatus> containerStatuses,
       List<ApplicationId> runningApplications, Set<NodeLabel> nodeLabels,
       Resource physicalResource) {
+    return newInstance(nodeId, httpPort, resource, nodeManagerVersionId,
+        containerStatuses, runningApplications, nodeLabels, physicalResource,
+        null);
+  }
+
+  public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
+      int httpPort, Resource resource, String nodeManagerVersionId,
+      List<NMContainerStatus> containerStatuses,
+      List<ApplicationId> runningApplications, Set<NodeLabel> nodeLabels,
+      Resource physicalResource, Set<NodeAttribute> nodeAttributes) {
     RegisterNodeManagerRequest request =
         Records.newRecord(RegisterNodeManagerRequest.class);
     request.setHttpPort(httpPort);
@@ -60,6 +71,7 @@ public abstract class RegisterNodeManagerRequest {
     request.setRunningApplications(runningApplications);
     request.setNodeLabels(nodeLabels);
     request.setPhysicalResource(physicalResource);
+    request.setNodeAttributes(nodeAttributes);
     return request;
   }
   
@@ -117,4 +129,8 @@ public abstract class RegisterNodeManagerRequest {
 
   public abstract void setLogAggregationReportsForApps(
       List<LogAggregationReport> logAggregationReportsForApps);
+
+  public abstract Set<NodeAttribute> getNodeAttributes();
+
+  public abstract void setNodeAttributes(Set<NodeAttribute> nodeAttributes);
 }

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java

@@ -58,4 +58,9 @@ public abstract class RegisterNodeManagerResponse {
 
   public abstract void setAreNodeLabelsAcceptedByRM(
       boolean areNodeLabelsAcceptedByRM);
+
+  public abstract boolean getAreNodeAttributesAcceptedByRM();
+
+  public abstract void setAreNodeAttributesAcceptedByRM(
+      boolean areNodeAttributesAcceptedByRM);
 }

+ 15 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java

@@ -787,6 +787,21 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
     this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM);
   }
 
+  @Override
+  public boolean getAreNodeAttributesAcceptedByRM() {
+    NodeHeartbeatResponseProtoOrBuilder p =
+        this.viaProto ? this.proto : this.builder;
+    return p.getAreNodeAttributesAcceptedByRM();
+  }
+
+  @Override
+  public void setAreNodeAttributesAcceptedByRM(
+      boolean areNodeAttributesAcceptedByRM) {
+    maybeInitBuilder();
+    this.builder
+        .setAreNodeAttributesAcceptedByRM(areNodeAttributesAcceptedByRM);
+  }
+
   @Override
   public List<SignalContainerRequest> getContainersToSignalList() {
     initContainersToSignal();

+ 53 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java

@@ -26,22 +26,26 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.NodeAttributePBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeAttributeProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeAttributesProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
@@ -58,6 +62,7 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
   private List<NMContainerStatus> containerStatuses = null;
   private List<ApplicationId> runningApplications = null;
   private Set<NodeLabel> labels = null;
+  private Set<NodeAttribute> attributes = null;
 
   private List<LogAggregationReport> logAggregationReportsForApps = null;
 
@@ -101,6 +106,15 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
       }
       builder.setNodeLabels(newBuilder.build());
     }
+    if (this.attributes != null) {
+      builder.clearNodeAttributes();
+      NodeAttributesProto.Builder attributesBuilder =
+          NodeAttributesProto.newBuilder();
+      for (NodeAttribute attribute : attributes) {
+        attributesBuilder.addNodeAttributes(convertToProtoFormat(attribute));
+      }
+      builder.setNodeAttributes(attributesBuilder.build());
+    }
     if (this.physicalResource != null) {
       builder.setPhysicalResource(convertToProtoFormat(this.physicalResource));
     }
@@ -404,6 +418,36 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
     }
   }
 
+  @Override
+  public synchronized Set<NodeAttribute> getNodeAttributes() {
+    initNodeAttributes();
+    return this.attributes;
+  }
+
+  @Override
+  public synchronized void setNodeAttributes(
+      Set<NodeAttribute> nodeAttributes) {
+    maybeInitBuilder();
+    builder.clearNodeAttributes();
+    this.attributes = nodeAttributes;
+  }
+
+  private synchronized void initNodeAttributes() {
+    if (this.attributes != null) {
+      return;
+    }
+    RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasNodeAttributes()) {
+      attributes=null;
+      return;
+    }
+    NodeAttributesProto nodeAttributes = p.getNodeAttributes();
+    attributes = new HashSet<>();
+    for(NodeAttributeProto nap : nodeAttributes.getNodeAttributesList()) {
+      attributes.add(convertFromProtoFormat(nap));
+    }
+  }
+
   private static NodeLabelPBImpl convertFromProtoFormat(NodeLabelProto p) {
     return new NodeLabelPBImpl(p);
   }
@@ -412,6 +456,15 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
     return ((NodeLabelPBImpl)t).getProto();
   }
 
+  private static NodeAttributePBImpl convertFromProtoFormat(
+      NodeAttributeProto p) {
+    return new NodeAttributePBImpl(p);
+  }
+
+  private static NodeAttributeProto convertToProtoFormat(NodeAttribute t) {
+    return ((NodeAttributePBImpl)t).getProto();
+  }
+
   private static ApplicationIdPBImpl convertFromProtoFormat(
       ApplicationIdProto p) {
     return new ApplicationIdPBImpl(p);

+ 15 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java

@@ -269,4 +269,19 @@ public class RegisterNodeManagerResponsePBImpl
     maybeInitBuilder();
     this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM);
   }
+
+  @Override
+  public boolean getAreNodeAttributesAcceptedByRM() {
+    RegisterNodeManagerResponseProtoOrBuilder p =
+        this.viaProto ? this.proto : this.builder;
+    return p.getAreNodeAttributesAcceptedByRM();
+  }
+
+  @Override
+  public void setAreNodeAttributesAcceptedByRM(
+      boolean areNodeAttributesAcceptedByRM) {
+    maybeInitBuilder();
+    this.builder
+        .setAreNodeAttributesAcceptedByRM(areNodeAttributesAcceptedByRM);
+  }
 }  

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto

@@ -72,6 +72,7 @@ message RegisterNodeManagerRequestProto {
   optional NodeLabelsProto nodeLabels = 8;
   optional ResourceProto physicalResource = 9;
   repeated LogAggregationReportProto log_aggregation_reports_for_apps = 10;
+  optional NodeAttributesProto nodeAttributes = 11;
 }
 
 message RegisterNodeManagerResponseProto {
@@ -83,6 +84,7 @@ message RegisterNodeManagerResponseProto {
   optional string rm_version = 6;
   optional bool areNodeLabelsAcceptedByRM = 7 [default = false];
   optional ResourceProto resource = 8;
+  optional bool areNodeAttributesAcceptedByRM = 9 [default = false];
 }
 
 message UnRegisterNodeManagerRequestProto {
@@ -128,6 +130,7 @@ message NodeHeartbeatResponseProto {
   repeated AppCollectorDataProto app_collectors = 16;
   // to be used in place of containers_to_decrease
   repeated ContainerProto containers_to_update = 17;
+  optional bool areNodeAttributesAcceptedByRM = 18 [default = false];
 }
 
 message ContainerQueuingLimitProto {

+ 254 - 77
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -30,6 +30,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -377,6 +378,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       throws YarnException, IOException {
     RegisterNodeManagerResponse regNMResponse;
     Set<NodeLabel> nodeLabels = nodeLabelsHandler.getNodeLabelsForRegistration();
+    Set<NodeAttribute> nodeAttributes =
+        nodeAttributesHandler.getNodeAttributesForRegistration();
 
     // Synchronize NM-RM registration with
     // ContainerManagerImpl#increaseContainersResource and
@@ -387,7 +390,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       RegisterNodeManagerRequest request =
           RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
               nodeManagerVersionId, containerReports, getRunningApplications(),
-              nodeLabels, physicalResource);
+              nodeLabels, physicalResource, nodeAttributes);
 
       if (containerReports != null) {
         LOG.info("Registering with RM using containers :" + containerReports);
@@ -473,6 +476,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
 
     successfullRegistrationMsg.append(nodeLabelsHandler
         .verifyRMRegistrationResponseForNodeLabels(regNMResponse));
+    successfullRegistrationMsg.append(nodeAttributesHandler
+        .verifyRMRegistrationResponseForNodeAttributes(regNMResponse));
 
     LOG.info(successfullRegistrationMsg.toString());
   }
@@ -875,34 +880,254 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
    */
   private NMNodeAttributesHandler createNMNodeAttributesHandler(
       NodeAttributesProvider provider) {
-    return provider == null ? null :
-        new NMDistributedNodeAttributesHandler(nodeAttributesProvider);
+    if (provider == null) {
+      return new NMCentralizedNodeAttributesHandler();
+    } else {
+      return new NMDistributedNodeAttributesHandler(provider, this.getConfig());
+    }
+  }
+
+  private static abstract class CachedNodeDescriptorHandler<T> {
+    private final long resyncInterval;
+    private final T defaultValue;
+    private T previousValue;
+    private long lastSendMills = 0L;
+    private boolean isValueSented;
+
+    CachedNodeDescriptorHandler(T defaultValue,
+        long resyncInterval) {
+      this.defaultValue = defaultValue;
+      this.resyncInterval = resyncInterval;
+    }
+
+    public abstract T getValueFromProvider();
+
+    public T getValueForRegistration() {
+      T value = getValueFromProvider();
+      if (defaultValue != null) {
+        value = (null == value) ? defaultValue : value;
+      }
+      previousValue = value;
+      try {
+        validate(value);
+      } catch (IOException e) {
+        value = null;
+      }
+      return value;
+    }
+
+    public T getValueForHeartbeat() {
+      T value = getValueFromProvider();
+      // if the provider returns null then consider default value are set
+      if (defaultValue != null) {
+        value = (null == value) ? defaultValue : value;
+      }
+      // take some action only on modification of value
+      boolean isValueUpdated = isValueUpdated(value);
+
+      isValueSented = false;
+      // When value updated or resync time is elapsed will send again in
+      // heartbeat.
+      if (isValueUpdated || isResyncIntervalElapsed()) {
+        previousValue = value;
+        try {
+          validate(value);
+          isValueSented = true;
+        } catch (IOException e) {
+          // take previous value to replace invalid value, so that invalid
+          // value are not verified for every HB, and send empty set
+          // to RM to have same value which was earlier set.
+          value = null;
+        } finally {
+          // Set last send time in heartbeat
+          lastSendMills = System.currentTimeMillis();
+        }
+      } else {
+        // if value have not changed then no need to send
+        value = null;
+      }
+      return value;
+    }
+
+    /**
+     * This method checks resync interval is elapsed or not.
+     */
+    public boolean isResyncIntervalElapsed() {
+      long elapsedTimeSinceLastSync =
+          System.currentTimeMillis() - lastSendMills;
+      if (elapsedTimeSinceLastSync > resyncInterval) {
+        return true;
+      }
+      return false;
+    }
+
+    protected abstract void validate(T value) throws IOException;
+
+    protected abstract boolean isValueUpdated(T value);
+
+    public long getResyncInterval() {
+      return resyncInterval;
+    }
+
+    public T getDefaultValue() {
+      return defaultValue;
+    }
+
+    public T getPreviousValue() {
+      return previousValue;
+    }
+
+    public long getLastSendMills() {
+      return lastSendMills;
+    }
+
+    public boolean isValueSented() {
+      return isValueSented;
+    }
   }
 
   private interface NMNodeAttributesHandler {
 
+    /**
+     * validates nodeAttributes From Provider and returns it to the caller. Also
+     * ensures that if provider returns null then empty set is considered
+     */
+    Set<NodeAttribute> getNodeAttributesForRegistration();
+
     /**
      * @return the node attributes of this node manager.
      */
     Set<NodeAttribute> getNodeAttributesForHeartbeat();
+
+    /**
+     * @return RMRegistration Success message and on failure will log
+     *         independently and returns empty string
+     */
+    String verifyRMRegistrationResponseForNodeAttributes(
+        RegisterNodeManagerResponse regNMResponse);
+
+    /**
+     * check whether if updated attributes sent to RM was accepted or not.
+     * @param response
+     */
+    void verifyRMHeartbeatResponseForNodeAttributes(
+        NodeHeartbeatResponse response);
+  }
+
+
+  /**
+   * In centralized configuration, NM need not send Node attributes or process
+   * the response.
+   */
+  private static class NMCentralizedNodeAttributesHandler
+      implements NMNodeAttributesHandler {
+    @Override
+    public Set<NodeAttribute> getNodeAttributesForHeartbeat() {
+      return null;
+    }
+
+    @Override
+    public Set<NodeAttribute> getNodeAttributesForRegistration() {
+      return null;
+    }
+
+    @Override
+    public void verifyRMHeartbeatResponseForNodeAttributes(
+        NodeHeartbeatResponse response) {
+    }
+
+    @Override
+    public String verifyRMRegistrationResponseForNodeAttributes(
+        RegisterNodeManagerResponse regNMResponse) {
+      return "";
+    }
   }
 
   private static class NMDistributedNodeAttributesHandler
+      extends CachedNodeDescriptorHandler<Set<NodeAttribute>>
       implements NMNodeAttributesHandler {
 
     private final NodeAttributesProvider attributesProvider;
 
     protected NMDistributedNodeAttributesHandler(
-        NodeAttributesProvider provider) {
+        NodeAttributesProvider provider, Configuration conf) {
+      super(Collections.unmodifiableSet(new HashSet<>(0)),
+          conf.getLong(YarnConfiguration.NM_NODE_ATTRIBUTES_RESYNC_INTERVAL,
+              YarnConfiguration.DEFAULT_NM_NODE_ATTRIBUTES_RESYNC_INTERVAL));
       this.attributesProvider = provider;
     }
 
+    @Override
+    public Set<NodeAttribute> getNodeAttributesForRegistration() {
+      return getValueForRegistration();
+    }
+
     @Override
     public Set<NodeAttribute> getNodeAttributesForHeartbeat() {
+      return getValueForHeartbeat();
+    }
+
+    @Override
+    public Set<NodeAttribute> getValueFromProvider() {
       return attributesProvider.getDescriptors();
     }
-  }
 
+    @Override
+    protected void validate(Set<NodeAttribute> nodeAttributes)
+        throws IOException {
+      try {
+        NodeLabelUtil.validateNodeAttributes(nodeAttributes);
+      } catch (IOException e) {
+        LOG.error(
+            "Invalid node attribute(s) from Provider : " + e.getMessage());
+        throw e;
+      }
+    }
+
+    @Override
+    protected boolean isValueUpdated(Set<NodeAttribute> value) {
+      return !NodeLabelUtil.isNodeAttributesEquals(getPreviousValue(), value);
+    }
+
+    @Override
+    public String verifyRMRegistrationResponseForNodeAttributes(
+        RegisterNodeManagerResponse regNMResponse) {
+      StringBuilder successfulNodeAttributesRegistrationMsg =
+          new StringBuilder();
+      if (regNMResponse.getAreNodeAttributesAcceptedByRM()) {
+        successfulNodeAttributesRegistrationMsg
+            .append(" and with following Node attribute(s) : {")
+            .append(getPreviousValue()).append("}");
+      } else {
+        // case where provider is set but RM did not accept the node attributes
+        String errorMsgFromRM = regNMResponse.getDiagnosticsMessage();
+        LOG.error("Node attributes sent from NM while registration were"
+            + " rejected by RM. " + ((errorMsgFromRM == null) ?
+            "Seems like RM is configured with Centralized Attributes." :
+            "And with message " + regNMResponse.getDiagnosticsMessage()));
+      }
+      return successfulNodeAttributesRegistrationMsg.toString();
+    }
+
+    @Override
+    public void verifyRMHeartbeatResponseForNodeAttributes(
+        NodeHeartbeatResponse response) {
+      if (isValueSented()) {
+        if (response.getAreNodeAttributesAcceptedByRM()) {
+          if(LOG.isDebugEnabled()){
+            LOG.debug("Node attributes {" + getPreviousValue()
+                + "} were Accepted by RM ");
+          }
+        } else {
+          // case where updated node attributes from NodeAttributesProvider
+          // is sent to RM and RM rejected the attributes
+          LOG.error("NM node attributes {" + getPreviousValue()
+              + "} were not accepted by RM and message from RM : " + response
+              .getDiagnosticsMessage());
+        }
+      }
+    }
+  }
 
   private static interface NMNodeLabelsHandler {
     /**
@@ -963,33 +1188,22 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
   }
 
   private static class NMDistributedNodeLabelsHandler
+      extends CachedNodeDescriptorHandler<Set<NodeLabel>>
       implements NMNodeLabelsHandler {
+
     private NMDistributedNodeLabelsHandler(
         NodeLabelsProvider nodeLabelsProvider, Configuration conf) {
-      this.nodeLabelsProvider = nodeLabelsProvider;
-      this.resyncInterval =
+      super(CommonNodeLabelsManager.EMPTY_NODELABEL_SET,
           conf.getLong(YarnConfiguration.NM_NODE_LABELS_RESYNC_INTERVAL,
-              YarnConfiguration.DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL);
+              YarnConfiguration.DEFAULT_NM_NODE_LABELS_RESYNC_INTERVAL));
+      this.nodeLabelsProvider = nodeLabelsProvider;
     }
 
     private final NodeLabelsProvider nodeLabelsProvider;
-    private Set<NodeLabel> previousNodeLabels;
-    private boolean areLabelsSentToRM;
-    private long lastNodeLabelSendMills = 0L;
-    private final long resyncInterval;
 
     @Override
     public Set<NodeLabel> getNodeLabelsForRegistration() {
-      Set<NodeLabel> nodeLabels = nodeLabelsProvider.getDescriptors();
-      nodeLabels = (null == nodeLabels)
-          ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET : nodeLabels;
-      previousNodeLabels = nodeLabels;
-      try {
-        validateNodeLabels(nodeLabels);
-      } catch (IOException e) {
-        nodeLabels = null;
-      }
-      return nodeLabels;
+      return getValueForRegistration();
     }
 
     @Override
@@ -999,7 +1213,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       if (regNMResponse.getAreNodeLabelsAcceptedByRM()) {
         successfulNodeLabelsRegistrationMsg
             .append(" and with following Node label(s) : {")
-            .append(StringUtils.join(",", previousNodeLabels)).append("}");
+            .append(StringUtils.join(",", getPreviousValue())).append("}");
       } else {
         // case where provider is set but RM did not accept the Node Labels
         String errorMsgFromRM = regNMResponse.getDiagnosticsMessage();
@@ -1014,50 +1228,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
 
     @Override
     public Set<NodeLabel> getNodeLabelsForHeartbeat() {
-      Set<NodeLabel> nodeLabelsForHeartbeat =
-          nodeLabelsProvider.getDescriptors();
-      // if the provider returns null then consider empty labels are set
-      nodeLabelsForHeartbeat = (nodeLabelsForHeartbeat == null)
-          ? CommonNodeLabelsManager.EMPTY_NODELABEL_SET
-          : nodeLabelsForHeartbeat;
-      // take some action only on modification of labels
-      boolean areNodeLabelsUpdated =
-          nodeLabelsForHeartbeat.size() != previousNodeLabels.size()
-              || !previousNodeLabels.containsAll(nodeLabelsForHeartbeat);
-
-      areLabelsSentToRM = false;
-      // When nodelabels elapsed or resync time is elapsed will send again in
-      // heartbeat.
-      if (areNodeLabelsUpdated || isResyncIntervalElapsed()) {
-        previousNodeLabels = nodeLabelsForHeartbeat;
-        try {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Labels from provider: "
-                + StringUtils.join(",", previousNodeLabels));
-          }
-          validateNodeLabels(nodeLabelsForHeartbeat);
-          areLabelsSentToRM = true;
-        } catch (IOException e) {
-          // set previous node labels to invalid set, so that invalid
-          // labels are not verified for every HB, and send empty set
-          // to RM to have same nodeLabels which was earlier set.
-          nodeLabelsForHeartbeat = null;
-        } finally {
-          // Set last send time in heartbeat
-          lastNodeLabelSendMills = System.currentTimeMillis();
-        }
-      } else {
-        // if nodelabels have not changed then no need to send
-        nodeLabelsForHeartbeat = null;
-      }
-      return nodeLabelsForHeartbeat;
+      return getValueForHeartbeat();
     }
 
-    private void validateNodeLabels(Set<NodeLabel> nodeLabelsForHeartbeat)
+    protected void validate(Set<NodeLabel> nodeLabels)
         throws IOException {
-      Iterator<NodeLabel> iterator = nodeLabelsForHeartbeat.iterator();
+      Iterator<NodeLabel> iterator = nodeLabels.iterator();
       boolean hasInvalidLabel = false;
-      StringBuilder errorMsg = new StringBuilder("");
+      StringBuilder errorMsg = new StringBuilder();
       while (iterator.hasNext()) {
         try {
           NodeLabelUtil
@@ -1074,33 +1252,31 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       }
     }
 
-    /*
-     * This method checks resync interval is elapsed or not.
-     */
-    public boolean isResyncIntervalElapsed() {
-      long elapsedTimeSinceLastSync =
-          System.currentTimeMillis() - lastNodeLabelSendMills;
-      if (elapsedTimeSinceLastSync > resyncInterval) {
-        return true;
-      }
-      return false;
+    @Override
+    public Set<NodeLabel> getValueFromProvider() {
+      return this.nodeLabelsProvider.getDescriptors();
+    }
+
+    @Override
+    protected boolean isValueUpdated(Set<NodeLabel> value) {
+      return !Objects.equals(value, getPreviousValue());
     }
 
     @Override
     public void verifyRMHeartbeatResponseForNodeLabels(
         NodeHeartbeatResponse response) {
-      if (areLabelsSentToRM) {
+      if (isValueSented()) {
         if (response.getAreNodeLabelsAcceptedByRM()) {
           if(LOG.isDebugEnabled()){
             LOG.debug(
-                "Node Labels {" + StringUtils.join(",", previousNodeLabels)
+                "Node Labels {" + StringUtils.join(",", getPreviousValue())
                     + "} were Accepted by RM ");
           }
         } else {
           // case where updated labels from NodeLabelsProvider is sent to RM and
           // RM rejected the labels
           LOG.error(
-              "NM node labels {" + StringUtils.join(",", previousNodeLabels)
+              "NM node labels {" + StringUtils.join(",", getPreviousValue())
                   + "} were not accepted by RM and message from RM : "
                   + response.getDiagnosticsMessage());
         }
@@ -1120,7 +1296,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
           Set<NodeLabel> nodeLabelsForHeartbeat =
               nodeLabelsHandler.getNodeLabelsForHeartbeat();
           Set<NodeAttribute> nodeAttributesForHeartbeat =
-              nodeAttributesHandler == null ? null :
                   nodeAttributesHandler.getNodeAttributesForHeartbeat();
           NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
           NodeHeartbeatRequest request =
@@ -1153,6 +1328,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
           if (!handleShutdownOrResyncCommand(response)) {
             nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(
                 response);
+            nodeAttributesHandler
+                .verifyRMHeartbeatResponseForNodeAttributes(response);
 
             // Explicitly put this method after checking the resync
             // response. We

+ 439 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForAttributes.java

@@ -0,0 +1,439 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.lang.Thread.State;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.ServerSocketUtil;
+import org.apache.hadoop.service.ServiceOperations;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
+import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
+import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
+import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider;
+import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test NodeStatusUpdater for node attributes.
+ */
+public class TestNodeStatusUpdaterForAttributes extends NodeLabelTestBase {
+  private static final RecordFactory RECORD_FACTORY =
+      RecordFactoryProvider.getRecordFactory(null);
+
+  private NodeManager nm;
+  private DummyNodeAttributesProvider dummyAttributesProviderRef;
+
+  @Before
+  public void setup() {
+    dummyAttributesProviderRef = new DummyNodeAttributesProvider();
+  }
+
+  @After
+  public void tearDown() {
+    if (null != nm) {
+      ServiceOperations.stop(nm);
+    }
+  }
+
+  private class ResourceTrackerForAttributes implements ResourceTracker {
+    private int heartbeatID = 0;
+    private Set<NodeAttribute> attributes;
+
+    private boolean receivedNMHeartbeat = false;
+    private boolean receivedNMRegister = false;
+
+    private MasterKey createMasterKey() {
+      MasterKey masterKey = new MasterKeyPBImpl();
+      masterKey.setKeyId(123);
+      masterKey.setBytes(
+          ByteBuffer.wrap(new byte[] {new Integer(123).byteValue() }));
+      return masterKey;
+    }
+
+    @Override
+    public RegisterNodeManagerResponse registerNodeManager(
+        RegisterNodeManagerRequest request) throws YarnException, IOException {
+      attributes = request.getNodeAttributes();
+      RegisterNodeManagerResponse response =
+          RECORD_FACTORY.newRecordInstance(RegisterNodeManagerResponse.class);
+      response.setNodeAction(NodeAction.NORMAL);
+      response.setContainerTokenMasterKey(createMasterKey());
+      response.setNMTokenMasterKey(createMasterKey());
+      response.setAreNodeAttributesAcceptedByRM(attributes != null);
+      synchronized (ResourceTrackerForAttributes.class) {
+        receivedNMRegister = true;
+        ResourceTrackerForAttributes.class.notifyAll();
+      }
+      return response;
+    }
+
+    public void waitTillHeartbeat()
+        throws InterruptedException, TimeoutException {
+      GenericTestUtils.waitFor(() -> receivedNMHeartbeat, 100, 30000);
+      if (!receivedNMHeartbeat) {
+        Assert.fail("Heartbeat is not received even after waiting");
+      }
+    }
+
+    public void waitTillRegister()
+        throws InterruptedException, TimeoutException {
+      GenericTestUtils.waitFor(() -> receivedNMRegister, 100, 30000);
+      if (!receivedNMRegister) {
+        Assert.fail("Registration is not received even after waiting");
+      }
+    }
+
+    /**
+     * Flag to indicate received any.
+     */
+    public void resetNMHeartbeatReceiveFlag() {
+      synchronized (ResourceTrackerForAttributes.class) {
+        receivedNMHeartbeat = false;
+      }
+    }
+
+    @Override
+    public NodeHeartbeatResponse nodeHeartbeat(
+        NodeHeartbeatRequest request) {
+      attributes = request.getNodeAttributes();
+      NodeStatus nodeStatus = request.getNodeStatus();
+      nodeStatus.setResponseId(heartbeatID++);
+
+      NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils
+          .newNodeHeartbeatResponse(heartbeatID, NodeAction.NORMAL, null, null,
+              null, null, 1000L);
+
+      // to ensure that heartbeats are sent only when required.
+      nhResponse.setNextHeartBeatInterval(Long.MAX_VALUE);
+      nhResponse.setAreNodeAttributesAcceptedByRM(attributes != null);
+
+      synchronized (ResourceTrackerForAttributes.class) {
+        receivedNMHeartbeat = true;
+        ResourceTrackerForAttributes.class.notifyAll();
+      }
+      return nhResponse;
+    }
+
+    @Override
+    public UnRegisterNodeManagerResponse unRegisterNodeManager(
+        UnRegisterNodeManagerRequest request) {
+      return null;
+    }
+  }
+
+  /**
+   * A dummy NodeAttributesProvider class for tests.
+   */
+  public static class DummyNodeAttributesProvider
+      extends NodeAttributesProvider {
+
+    public DummyNodeAttributesProvider() {
+      super("DummyNodeAttributesProvider");
+      // disable the fetch timer.
+      setIntervalTime(-1);
+    }
+
+    @Override
+    protected void cleanUp() throws Exception {
+      // fake implementation, nothing to cleanup
+    }
+
+    @Override
+    public TimerTask createTimerTask() {
+      return new TimerTask() {
+        @Override
+        public void run() {
+          setDescriptors(Collections.unmodifiableSet(new HashSet<>(0)));
+        }
+      };
+    }
+  }
+
+  private YarnConfiguration createNMConfigForDistributeNodeAttributes() {
+    YarnConfiguration conf = new YarnConfiguration();
+    return conf;
+  }
+
+  @Test(timeout = 20000)
+  public void testNodeStatusUpdaterForNodeAttributes()
+      throws InterruptedException, IOException, TimeoutException {
+    final ResourceTrackerForAttributes resourceTracker =
+        new ResourceTrackerForAttributes();
+    nm = new NodeManager() {
+      @Override
+      protected NodeAttributesProvider createNodeAttributesProvider(
+          Configuration conf) throws IOException {
+        return dummyAttributesProviderRef;
+      }
+
+      @Override
+      protected NodeStatusUpdater createNodeStatusUpdater(
+          Context context, Dispatcher dispatcher,
+          NodeHealthCheckerService healthChecker) {
+
+        return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
+            metrics) {
+          @Override
+          protected ResourceTracker getRMClient() {
+            return resourceTracker;
+          }
+
+          @Override
+          protected void stopRMProxy() {
+            return;
+          }
+        };
+      }
+    };
+
+    YarnConfiguration conf = createNMConfigForDistributeNodeAttributes();
+    conf.setLong(YarnConfiguration.NM_NODE_ATTRIBUTES_RESYNC_INTERVAL, 2000);
+    conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS,
+        "0.0.0.0:" + ServerSocketUtil.getPort(8040, 10));
+
+    nm.init(conf);
+    resourceTracker.resetNMHeartbeatReceiveFlag();
+    nm.start();
+    resourceTracker.waitTillRegister();
+    assertTrue(NodeLabelUtil
+        .isNodeAttributesEquals(dummyAttributesProviderRef.getDescriptors(),
+            resourceTracker.attributes));
+
+    resourceTracker.waitTillHeartbeat(); // wait till the first heartbeat
+    resourceTracker.resetNMHeartbeatReceiveFlag();
+
+    // heartbeat with updated attributes
+    NodeAttribute attribute1 = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr1",
+            NodeAttributeType.STRING, "V1");
+    dummyAttributesProviderRef.setDescriptors(ImmutableSet.of(attribute1));
+
+    sendOutofBandHeartBeat();
+    resourceTracker.waitTillHeartbeat();
+    assertTrue(NodeLabelUtil
+        .isNodeAttributesEquals(dummyAttributesProviderRef.getDescriptors(),
+            resourceTracker.attributes));
+    resourceTracker.resetNMHeartbeatReceiveFlag();
+
+    // heartbeat without updating attributes
+    sendOutofBandHeartBeat();
+    resourceTracker.waitTillHeartbeat();
+    resourceTracker.resetNMHeartbeatReceiveFlag();
+    assertNull("If no change in attributes"
+            + " then null should be sent as part of request",
+        resourceTracker.attributes);
+
+    // provider return with null attributes
+    dummyAttributesProviderRef.setDescriptors(null);
+    sendOutofBandHeartBeat();
+    resourceTracker.waitTillHeartbeat();
+    assertNotNull("If provider sends null"
+            + " then empty label set should be sent and not null",
+        resourceTracker.attributes);
+    assertTrue("If provider sends null then empty attributes should be sent",
+        resourceTracker.attributes.isEmpty());
+    resourceTracker.resetNMHeartbeatReceiveFlag();
+    // Since the resync interval is set to 2 sec in every alternate heartbeat
+    // the attributes will be send along with heartbeat.
+    // In loop we sleep for 1 sec
+    // so that every sec 1 heartbeat is send.
+    int nullAttributes = 0;
+    int nonNullAttributes = 0;
+    dummyAttributesProviderRef.setDescriptors(ImmutableSet.of(attribute1));
+    for (int i = 0; i < 5; i++) {
+      sendOutofBandHeartBeat();
+      resourceTracker.waitTillHeartbeat();
+      if (null == resourceTracker.attributes) {
+        nullAttributes++;
+      } else {
+        Assert.assertTrue("In heartbeat PI attributes should be send",
+            NodeLabelUtil.isNodeAttributesEquals(ImmutableSet.of(attribute1),
+                resourceTracker.attributes));
+        nonNullAttributes++;
+      }
+      resourceTracker.resetNMHeartbeatReceiveFlag();
+      Thread.sleep(1000);
+    }
+    Assert.assertTrue("More than one heartbeat with empty attributes expected",
+        nullAttributes > 1);
+    Assert.assertTrue("More than one heartbeat with attributes expected",
+        nonNullAttributes > 1);
+    nm.stop();
+  }
+
+  @Test(timeout = 20000)
+  public void testInvalidNodeAttributesFromProvider()
+      throws InterruptedException, IOException, TimeoutException {
+    final ResourceTrackerForAttributes resourceTracker =
+        new ResourceTrackerForAttributes();
+    nm = new NodeManager() {
+      @Override protected NodeAttributesProvider createNodeAttributesProvider(
+          Configuration conf) throws IOException {
+        return dummyAttributesProviderRef;
+      }
+
+      @Override protected NodeStatusUpdater createNodeStatusUpdater(
+          Context context, Dispatcher dispatcher,
+          NodeHealthCheckerService healthChecker) {
+
+        return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
+            metrics) {
+          @Override protected ResourceTracker getRMClient() {
+            return resourceTracker;
+          }
+
+          @Override protected void stopRMProxy() {
+            return;
+          }
+        };
+      }
+    };
+
+    YarnConfiguration conf = createNMConfigForDistributeNodeAttributes();
+    conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS,
+        "0.0.0.0:" + ServerSocketUtil.getPort(8040, 10));
+    nm.init(conf);
+    resourceTracker.resetNMHeartbeatReceiveFlag();
+    nm.start();
+    resourceTracker.waitTillRegister();
+    assertTrue(NodeLabelUtil
+        .isNodeAttributesEquals(dummyAttributesProviderRef.getDescriptors(),
+            resourceTracker.attributes));
+
+    resourceTracker.waitTillHeartbeat(); // wait till the first heartbeat
+    resourceTracker.resetNMHeartbeatReceiveFlag();
+
+    // update attribute1
+    NodeAttribute attribute1 = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr1",
+            NodeAttributeType.STRING, "V1");
+    dummyAttributesProviderRef.setDescriptors(ImmutableSet.of(attribute1));
+    sendOutofBandHeartBeat();
+    resourceTracker.waitTillHeartbeat();
+    assertTrue(NodeLabelUtil.isNodeAttributesEquals(ImmutableSet.of(attribute1),
+        resourceTracker.attributes));
+    resourceTracker.resetNMHeartbeatReceiveFlag();
+
+    // update attribute2
+    NodeAttribute attribute2 = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr2",
+            NodeAttributeType.STRING, "V2");
+    dummyAttributesProviderRef.setDescriptors(ImmutableSet.of(attribute2));
+    sendOutofBandHeartBeat();
+    resourceTracker.waitTillHeartbeat();
+    assertTrue(NodeLabelUtil.isNodeAttributesEquals(ImmutableSet.of(attribute2),
+        resourceTracker.attributes));
+    resourceTracker.resetNMHeartbeatReceiveFlag();
+
+    // update attribute2 & attribute2
+    dummyAttributesProviderRef
+        .setDescriptors(ImmutableSet.of(attribute1, attribute2));
+    sendOutofBandHeartBeat();
+    resourceTracker.waitTillHeartbeat();
+    assertTrue(NodeLabelUtil
+        .isNodeAttributesEquals(ImmutableSet.of(attribute1, attribute2),
+            resourceTracker.attributes));
+    resourceTracker.resetNMHeartbeatReceiveFlag();
+
+    // heartbeat with invalid attributes
+    NodeAttribute invalidAttribute = NodeAttribute
+        .newInstance("_.P", "Attr1", NodeAttributeType.STRING, "V1");
+    dummyAttributesProviderRef
+        .setDescriptors(ImmutableSet.of(invalidAttribute));
+    sendOutofBandHeartBeat();
+    resourceTracker.waitTillHeartbeat();
+    assertNull("On Invalid Attributes we need to retain earlier attributes, HB"
+        + " needs to send null", resourceTracker.attributes);
+    resourceTracker.resetNMHeartbeatReceiveFlag();
+
+    // on next heartbeat same invalid attributes will be given by the provider,
+    // but again validation check and reset RM with invalid attributes set
+    // should not happen
+    sendOutofBandHeartBeat();
+    resourceTracker.waitTillHeartbeat();
+    assertNull("NodeStatusUpdater need not send repeatedly empty attributes on"
+        + " invalid attributes from provider ", resourceTracker.attributes);
+    resourceTracker.resetNMHeartbeatReceiveFlag();
+  }
+
+  /**
+   * This is to avoid race condition in the test case. NodeStatusUpdater
+   * heartbeat thread after sending the heartbeat needs some time to process the
+   * response and then go wait state. But in the test case once the main test
+   * thread returns back after resourceTracker.waitTillHeartbeat() we proceed
+   * with next sendOutofBandHeartBeat before heartbeat thread is blocked on
+   * wait.
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  private void sendOutofBandHeartBeat()
+      throws InterruptedException, IOException {
+    int i = 0;
+    do {
+      State statusUpdaterThreadState =
+          ((NodeStatusUpdaterImpl) nm.getNodeStatusUpdater())
+              .getStatusUpdaterThreadState();
+      if (statusUpdaterThreadState.equals(Thread.State.TIMED_WAITING)
+          || statusUpdaterThreadState.equals(Thread.State.WAITING)) {
+        nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
+        break;
+      }
+      if (++i <= 10) {
+        Thread.sleep(50);
+      } else {
+        throw new IOException("Waited for 500 ms"
+            + " but NodeStatusUpdaterThread not in waiting state");
+      }
+    } while (true);
+  }
+}

+ 81 - 22
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
@@ -483,6 +484,22 @@ public class ResourceTrackerService extends AbstractService implements
       this.rmContext.getRMDelegatedNodeLabelsUpdater().updateNodeLabels(nodeId);
     }
 
+    // Update node's attributes to RM's NodeAttributesManager.
+    if (request.getNodeAttributes() != null) {
+      try {
+        // update node attributes if necessary then update heartbeat response
+        updateNodeAttributesIfNecessary(nodeId, request.getNodeAttributes());
+        response.setAreNodeAttributesAcceptedByRM(true);
+      } catch (IOException ex) {
+        //ensure the error message is captured and sent across in response
+        String errorMsg = response.getDiagnosticsMessage() == null ?
+            ex.getMessage() :
+            response.getDiagnosticsMessage() + "\n" + ex.getMessage();
+        response.setDiagnosticsMessage(errorMsg);
+        response.setAreNodeAttributesAcceptedByRM(false);
+      }
+    }
+
     StringBuilder message = new StringBuilder();
     message.append("NodeManager from node ").append(host).append("(cmPort: ")
         .append(cmPort).append(" httpPort: ");
@@ -493,6 +510,10 @@ public class ResourceTrackerService extends AbstractService implements
       message.append(", node labels { ").append(
           StringUtils.join(",", nodeLabels) + " } ");
     }
+    if (response.getAreNodeAttributesAcceptedByRM()) {
+      message.append(", node attributes { ")
+          .append(request.getNodeAttributes() + " } ");
+    }
 
     LOG.info(message.toString());
     response.setNodeAction(NodeAction.NORMAL);
@@ -650,34 +671,72 @@ public class ResourceTrackerService extends AbstractService implements
 
     // 8. Get node's attributes and update node-to-attributes mapping
     // in RMNodeAttributeManager.
-    Set<NodeAttribute> nodeAttributes = request.getNodeAttributes();
-    if (nodeAttributes != null && !nodeAttributes.isEmpty()) {
-      nodeAttributes.forEach(nodeAttribute ->
-          LOG.debug(nodeId.toString() + " ATTRIBUTE : "
-              + nodeAttribute.toString()));
-
-      // Validate attributes
-      if (!nodeAttributes.stream().allMatch(
-          nodeAttribute -> NodeAttribute.PREFIX_DISTRIBUTED
-              .equals(nodeAttribute.getAttributeKey().getAttributePrefix()))) {
-        // All attributes must be in same prefix: nm.yarn.io.
-        // Since we have the checks in NM to make sure attributes reported
-        // in HB are with correct prefix, so it should not reach here.
-        LOG.warn("Reject invalid node attributes from host: "
-            + nodeId.toString() + ", attributes in HB must have prefix "
-            + NodeAttribute.PREFIX_DISTRIBUTED);
-      } else {
-        // Replace all distributed node attributes associated with this host
-        // with the new reported attributes in node attribute manager.
-        this.rmContext.getNodeAttributesManager()
-            .replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED,
-                ImmutableMap.of(nodeId.getHost(), nodeAttributes));
+    if (request.getNodeAttributes() != null) {
+      try {
+        // update node attributes if necessary then update heartbeat response
+        updateNodeAttributesIfNecessary(nodeId, request.getNodeAttributes());
+        nodeHeartBeatResponse.setAreNodeAttributesAcceptedByRM(true);
+      } catch (IOException ex) {
+        //ensure the error message is captured and sent across in response
+        String errorMsg =
+            nodeHeartBeatResponse.getDiagnosticsMessage() == null ?
+                ex.getMessage() :
+                nodeHeartBeatResponse.getDiagnosticsMessage() + "\n" + ex
+                    .getMessage();
+        nodeHeartBeatResponse.setDiagnosticsMessage(errorMsg);
+        nodeHeartBeatResponse.setAreNodeAttributesAcceptedByRM(false);
       }
     }
 
     return nodeHeartBeatResponse;
   }
 
+  /**
+   * Update node attributes if necessary.
+   * @param nodeId - node id
+   * @param nodeAttributes - node attributes
+   * @return true if updated
+   * @throws IOException if prefix type is not distributed
+   */
+  private void updateNodeAttributesIfNecessary(NodeId nodeId,
+      Set<NodeAttribute> nodeAttributes) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      nodeAttributes.forEach(nodeAttribute -> LOG.debug(
+          nodeId.toString() + " ATTRIBUTE : " + nodeAttribute.toString()));
+    }
+
+    // Validate attributes
+    if (!nodeAttributes.stream().allMatch(
+        nodeAttribute -> NodeAttribute.PREFIX_DISTRIBUTED
+            .equals(nodeAttribute.getAttributeKey().getAttributePrefix()))) {
+      // All attributes must be in same prefix: nm.yarn.io.
+      // Since we have the checks in NM to make sure attributes reported
+      // in HB are with correct prefix, so it should not reach here.
+      throw new IOException("Reject invalid node attributes from host: "
+          + nodeId.toString() + ", attributes in HB must have prefix "
+          + NodeAttribute.PREFIX_DISTRIBUTED);
+    }
+    // Replace all distributed node attributes associated with this host
+    // with the new reported attributes in node attribute manager.
+    Set<NodeAttribute> currentNodeAttributes =
+        this.rmContext.getNodeAttributesManager()
+            .getAttributesForNode(nodeId.getHost()).keySet();
+    if (!currentNodeAttributes.isEmpty()) {
+      currentNodeAttributes = NodeLabelUtil
+          .filterAttributesByPrefix(currentNodeAttributes,
+              NodeAttribute.PREFIX_DISTRIBUTED);
+    }
+    if (!NodeLabelUtil
+        .isNodeAttributesEquals(nodeAttributes, currentNodeAttributes)) {
+      this.rmContext.getNodeAttributesManager()
+          .replaceNodeAttributes(NodeAttribute.PREFIX_DISTRIBUTED,
+              ImmutableMap.of(nodeId.getHost(), nodeAttributes));
+    } else if (LOG.isDebugEnabled()) {
+      LOG.debug("Skip updating node attributes since there is no change for "
+          + nodeId + " : " + nodeAttributes);
+    }
+  }
+
   private int getNextResponseId(int responseId) {
     // Loop between 0 and Integer.MAX_VALUE
     return (responseId + 1) & Integer.MAX_VALUE;

+ 6 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java

@@ -221,10 +221,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
 
       // Notify RM
       if (rmContext != null && rmContext.getDispatcher() != null) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Updated NodeAttribute event to RM:"
-              + newNodeToAttributesMap.values());
-        }
+        LOG.info("Updated NodeAttribute event to RM:"
+            + newNodeToAttributesMap);
         rmContext.getDispatcher().getEventHandler().handle(
             new NodeAttributesUpdateSchedulerEvent(newNodeToAttributesMap));
       }
@@ -306,9 +304,11 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
       for (NodeAttribute attribute : nodeToAttrMappingEntry.getValue()) {
         NodeAttributeKey attributeKey = attribute.getAttributeKey();
         String attributeName = attributeKey.getAttributeName().trim();
-        NodeLabelUtil.checkAndThrowLabelName(attributeName);
+        NodeLabelUtil.checkAndThrowAttributeName(attributeName);
         NodeLabelUtil
             .checkAndThrowAttributePrefix(attributeKey.getAttributePrefix());
+        NodeLabelUtil
+            .checkAndThrowAttributeValue(attribute.getAttributeValue());
 
         // ensure trimmed values are set back
         attributeKey.setAttributeName(attributeName);
@@ -747,8 +747,7 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
 
     // Notify RM
     if (rmContext != null && rmContext.getDispatcher() != null) {
-      LOG.info("Updated NodeAttribute event to RM:" + newNodeToAttributesMap
-          .values());
+      LOG.info("Updated NodeAttribute event to RM:" + newNodeToAttributesMap);
       rmContext.getDispatcher().getEventHandler().handle(
           new NodeAttributesUpdateSchedulerEvent(newNodeToAttributesMap));
     }

+ 451 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

@@ -18,7 +18,11 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
+import org.apache.hadoop.yarn.nodelabels.NodeLabelUtil;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.FileSystemNodeAttributeStore;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
@@ -42,6 +46,7 @@ import java.util.Set;
 import java.util.HashSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.transform.Transformer;
 import javax.xml.transform.TransformerFactory;
@@ -113,6 +118,9 @@ import org.apache.hadoop.yarn.util.YarnVersionInfo;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 
@@ -730,6 +738,137 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     }
   }
 
+  @Test
+  public void testNodeRegistrationWithAttributes() throws Exception {
+    writeToHostsFile("host2");
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+        hostFile.getAbsolutePath());
+    conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
+        FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
+    File tempDir = File.createTempFile("nattr", ".tmp");
+    tempDir.delete();
+    tempDir.mkdirs();
+    tempDir.deleteOnExit();
+    conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
+        tempDir.getAbsolutePath());
+    rm = new MockRM(conf);
+    rm.start();
+
+    ResourceTrackerService resourceTrackerService =
+        rm.getResourceTrackerService();
+    RegisterNodeManagerRequest registerReq =
+        Records.newRecord(RegisterNodeManagerRequest.class);
+    NodeId nodeId = NodeId.newInstance("host2", 1234);
+    Resource capability = BuilderUtils.newResource(1024, 1);
+    NodeAttribute nodeAttribute1 = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr1",
+            NodeAttributeType.STRING, "V1");
+    NodeAttribute nodeAttribute2 = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr2",
+            NodeAttributeType.STRING, "V2");
+    registerReq.setResource(capability);
+    registerReq.setNodeId(nodeId);
+    registerReq.setHttpPort(1234);
+    registerReq.setNMVersion(YarnVersionInfo.getVersion());
+    registerReq.setNodeAttributes(toSet(nodeAttribute1, nodeAttribute2));
+    RegisterNodeManagerResponse response =
+        resourceTrackerService.registerNodeManager(registerReq);
+
+    Assert.assertEquals("Action should be normal on valid Node Attributes",
+        NodeAction.NORMAL, response.getNodeAction());
+    Assert.assertTrue(NodeLabelUtil.isNodeAttributesEquals(
+        rm.getRMContext().getNodeAttributesManager()
+            .getAttributesForNode(nodeId.getHost()).keySet(),
+        registerReq.getNodeAttributes()));
+    Assert.assertTrue("Valid Node Attributes were not accepted by RM",
+        response.getAreNodeAttributesAcceptedByRM());
+
+    if (rm != null) {
+      rm.stop();
+    }
+  }
+
+  @Test
+  public void testNodeRegistrationWithInvalidAttributes() throws Exception {
+    writeToHostsFile("host2");
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+        hostFile.getAbsolutePath());
+    conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
+        FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
+    conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
+        TEMP_DIR.getAbsolutePath());
+    rm = new MockRM(conf);
+    rm.start();
+
+    ResourceTrackerService resourceTrackerService =
+        rm.getResourceTrackerService();
+    RegisterNodeManagerRequest req =
+        Records.newRecord(RegisterNodeManagerRequest.class);
+    NodeId nodeId = NodeId.newInstance("host2", 1234);
+    Resource capability = BuilderUtils.newResource(1024, 1);
+    NodeAttribute validNodeAttribute = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr1",
+            NodeAttributeType.STRING, "V1");
+    NodeAttribute invalidPrefixNodeAttribute = NodeAttribute
+        .newInstance("_P", "Attr1",
+            NodeAttributeType.STRING, "V2");
+    NodeAttribute invalidNameNodeAttribute = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "_N",
+            NodeAttributeType.STRING, "V2");
+    NodeAttribute invalidValueNodeAttribute = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr2",
+            NodeAttributeType.STRING, "...");
+    req.setResource(capability);
+    req.setNodeId(nodeId);
+    req.setHttpPort(1234);
+    req.setNMVersion(YarnVersionInfo.getVersion());
+
+    // check invalid prefix
+    req.setNodeAttributes(
+        toSet(validNodeAttribute, invalidPrefixNodeAttribute));
+    RegisterNodeManagerResponse response =
+        resourceTrackerService.registerNodeManager(req);
+    Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager()
+        .getAttributesForNode(nodeId.getHost()).size());
+    assertRegisterResponseForInvalidAttributes(response);
+    Assert.assertTrue(response.getDiagnosticsMessage()
+        .endsWith("attributes in HB must have prefix nm.yarn.io"));
+
+    // check invalid name
+    req.setNodeAttributes(toSet(validNodeAttribute, invalidNameNodeAttribute));
+    response = resourceTrackerService.registerNodeManager(req);
+    Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager()
+        .getAttributesForNode(nodeId.getHost()).size());
+    assertRegisterResponseForInvalidAttributes(response);
+    Assert.assertTrue(response.getDiagnosticsMessage()
+        .startsWith("attribute name should only contains"));
+
+    // check invalid value
+    req.setNodeAttributes(toSet(validNodeAttribute, invalidValueNodeAttribute));
+    response = resourceTrackerService.registerNodeManager(req);
+    Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager()
+        .getAttributesForNode(nodeId.getHost()).size());
+    assertRegisterResponseForInvalidAttributes(response);
+    Assert.assertTrue(response.getDiagnosticsMessage()
+        .startsWith("attribute value should only contains"));
+
+    if (rm != null) {
+      rm.stop();
+    }
+  }
+
+  private void assertRegisterResponseForInvalidAttributes(
+      RegisterNodeManagerResponse response) {
+    Assert.assertEquals(
+        "On Invalid Node Labels action is expected to be normal",
+            NodeAction.NORMAL, response.getNodeAction());
+    Assert.assertNotNull(response.getDiagnosticsMessage());
+    Assert.assertFalse("Node Labels should not accepted by RM If Invalid",
+        response.getAreNodeLabelsAcceptedByRM());
+  }
+
   private NodeStatus getNodeStatusObject(NodeId nodeId) {
     NodeStatus status = Records.newRecord(NodeStatus.class);
     status.setNodeId(nodeId);
@@ -832,12 +971,8 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
         hostFile.getAbsolutePath());
     conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
         FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
-    File tempDir = File.createTempFile("nattr", ".tmp");
-    tempDir.delete();
-    tempDir.mkdirs();
-    tempDir.deleteOnExit();
     conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
-        tempDir.getAbsolutePath());
+        TEMP_DIR.getAbsolutePath());
     rm = new MockRM(conf);
     rm.start();
 
@@ -905,6 +1040,287 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType());
   }
 
+  @Test
+  public void testNodeHeartbeatWithInvalidNodeAttributes() throws Exception {
+    writeToHostsFile("host2");
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+        hostFile.getAbsolutePath());
+    conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
+        FileSystemNodeAttributeStore.class, NodeAttributeStore.class);
+    conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
+        TEMP_DIR.getAbsolutePath());
+    rm = new MockRM(conf);
+    rm.start();
+
+    // Register to RM
+    ResourceTrackerService resourceTrackerService =
+        rm.getResourceTrackerService();
+    RegisterNodeManagerRequest registerReq =
+        Records.newRecord(RegisterNodeManagerRequest.class);
+    NodeId nodeId = NodeId.newInstance("host2", 1234);
+    Resource capability = BuilderUtils.newResource(1024, 1);
+    registerReq.setResource(capability);
+    registerReq.setNodeId(nodeId);
+    registerReq.setHttpPort(1234);
+    registerReq.setNMVersion(YarnVersionInfo.getVersion());
+    RegisterNodeManagerResponse registerResponse =
+        resourceTrackerService.registerNodeManager(registerReq);
+
+    NodeAttribute validNodeAttribute = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "host",
+            NodeAttributeType.STRING, "host2");
+    NodeAttribute invalidPrefixNodeAttribute = NodeAttribute
+        .newInstance("_P", "Attr1",
+            NodeAttributeType.STRING, "V2");
+    NodeAttribute invalidNameNodeAttribute = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "_N",
+            NodeAttributeType.STRING, "V2");
+    NodeAttribute invalidValueNodeAttribute = NodeAttribute
+        .newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "Attr2",
+            NodeAttributeType.STRING, "...");
+
+    // Set node attributes in HB.
+    NodeHeartbeatRequest heartbeatReq =
+        Records.newRecord(NodeHeartbeatRequest.class);
+    NodeStatus nodeStatusObject = getNodeStatusObject(nodeId);
+    int responseId = nodeStatusObject.getResponseId();
+    heartbeatReq.setNodeStatus(nodeStatusObject);
+    heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
+        .getNMTokenMasterKey());
+    heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
+        .getContainerTokenMasterKey());
+    heartbeatReq.setNodeAttributes(toSet(validNodeAttribute));
+
+    // Send first HB to RM with invalid prefix node attributes
+    heartbeatReq.setNodeAttributes(
+        toSet(validNodeAttribute, invalidPrefixNodeAttribute));
+    NodeHeartbeatResponse response =
+        resourceTrackerService.nodeHeartbeat(heartbeatReq);
+    Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager()
+        .getAttributesForNode(nodeId.getHost()).size());
+    assertNodeHeartbeatResponseForInvalidAttributes(response);
+    Assert.assertTrue(response.getDiagnosticsMessage()
+        .endsWith("attributes in HB must have prefix nm.yarn.io"));
+
+    // Send another HB to RM with invalid name node attributes
+    nodeStatusObject.setResponseId(++responseId);
+    heartbeatReq
+        .setNodeAttributes(toSet(validNodeAttribute, invalidNameNodeAttribute));
+    response = resourceTrackerService.nodeHeartbeat(heartbeatReq);
+    Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager()
+        .getAttributesForNode(nodeId.getHost()).size());
+    assertNodeHeartbeatResponseForInvalidAttributes(response);
+    Assert.assertTrue(response.getDiagnosticsMessage()
+        .startsWith("attribute name should only contains"));
+
+    // Send another HB to RM with invalid value node attributes
+    nodeStatusObject.setResponseId(++responseId);
+    heartbeatReq.setNodeAttributes(
+        toSet(validNodeAttribute, invalidValueNodeAttribute));
+    response = resourceTrackerService.nodeHeartbeat(heartbeatReq);
+    Assert.assertEquals(0, rm.getRMContext().getNodeAttributesManager()
+        .getAttributesForNode(nodeId.getHost()).size());
+    assertNodeHeartbeatResponseForInvalidAttributes(response);
+    Assert.assertTrue(response.getDiagnosticsMessage()
+        .startsWith("attribute value should only contains"));
+
+    // Send another HB to RM with updated node attribute
+    NodeAttribute updatedNodeAttribute = NodeAttribute.newInstance(
+        NodeAttribute.PREFIX_DISTRIBUTED, "host",
+        NodeAttributeType.STRING, "host3");
+    nodeStatusObject.setResponseId(++responseId);
+    heartbeatReq.setNodeAttributes(toSet(updatedNodeAttribute));
+    resourceTrackerService.nodeHeartbeat(heartbeatReq);
+
+    // Make sure RM gets the updated attribute
+    NodeAttributesManager attributeManager =
+        rm.getRMContext().getNodeAttributesManager();
+    Map<NodeAttribute, AttributeValue> attrs =
+        attributeManager.getAttributesForNode(nodeId.getHost());
+    Assert.assertEquals(1, attrs.size());
+    NodeAttribute na = attrs.keySet().iterator().next();
+    Assert.assertEquals("host", na.getAttributeKey().getAttributeName());
+    Assert.assertEquals("host3", na.getAttributeValue());
+    Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType());
+  }
+
+  private void assertNodeHeartbeatResponseForInvalidAttributes(
+      NodeHeartbeatResponse response) {
+    Assert.assertEquals(
+        "On Invalid Node Labels action is expected to be normal",
+        NodeAction.NORMAL, response.getNodeAction());
+    Assert.assertNotNull(response.getDiagnosticsMessage());
+    Assert.assertFalse("Node Labels should not accepted by RM If Invalid",
+        response.getAreNodeLabelsAcceptedByRM());
+  }
+
+  @Test
+  public void testNodeHeartbeatOnlyUpdateNodeAttributesIfNeeded()
+      throws Exception {
+    writeToHostsFile("host2");
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
+        hostFile.getAbsolutePath());
+    conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
+        NullNodeAttributeStore.class, NodeAttributeStore.class);
+    conf.set(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_ROOT_DIR,
+        TEMP_DIR.getAbsolutePath());
+    rm = new MockRM(conf);
+    rm.start();
+
+    // spy node attributes manager
+    NodeAttributesManager tmpAttributeManager =
+        rm.getRMContext().getNodeAttributesManager();
+    NodeAttributesManager spyAttributeManager = spy(tmpAttributeManager);
+    rm.getRMContext().setNodeAttributesManager(spyAttributeManager);
+    AtomicInteger count = new AtomicInteger(0);
+    Mockito.doAnswer(new Answer<Object>() {
+      public Object answer(InvocationOnMock invocation) throws Exception {
+        count.incrementAndGet();
+        tmpAttributeManager
+            .replaceNodeAttributes((String) invocation.getArguments()[0],
+                (Map<String, Set<NodeAttribute>>) invocation.getArguments()[1]);
+        return null;
+      }
+    }).when(spyAttributeManager)
+        .replaceNodeAttributes(Mockito.any(String.class),
+            Mockito.any(Map.class));
+
+    // Register to RM
+    ResourceTrackerService resourceTrackerService =
+        rm.getResourceTrackerService();
+    RegisterNodeManagerRequest registerReq =
+        Records.newRecord(RegisterNodeManagerRequest.class);
+    NodeId nodeId = NodeId.newInstance("host2", 1234);
+    Resource capability = BuilderUtils.newResource(1024, 1);
+    registerReq.setResource(capability);
+    registerReq.setNodeId(nodeId);
+    registerReq.setHttpPort(1234);
+    registerReq.setNMVersion(YarnVersionInfo.getVersion());
+    RegisterNodeManagerResponse registerResponse =
+        resourceTrackerService.registerNodeManager(registerReq);
+
+    Set<NodeAttribute> nodeAttributes = new HashSet<>();
+    nodeAttributes.add(NodeAttribute.newInstance(
+        NodeAttribute.PREFIX_DISTRIBUTED, "host",
+        NodeAttributeType.STRING, "host2"));
+
+    // Set node attributes in HB.
+    NodeHeartbeatRequest heartbeatReq =
+        Records.newRecord(NodeHeartbeatRequest.class);
+    NodeStatus nodeStatusObject = getNodeStatusObject(nodeId);
+    int responseId = nodeStatusObject.getResponseId();
+    heartbeatReq.setNodeStatus(nodeStatusObject);
+    heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
+        .getNMTokenMasterKey());
+    heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
+        .getContainerTokenMasterKey());
+    heartbeatReq.setNodeAttributes(nodeAttributes);
+    resourceTrackerService.nodeHeartbeat(heartbeatReq);
+
+    // Ensure RM gets correct node attributes update.
+    Map<NodeAttribute, AttributeValue> attrs = spyAttributeManager
+        .getAttributesForNode(nodeId.getHost());
+    spyAttributeManager.getNodesToAttributes(ImmutableSet.of(nodeId.getHost()));
+    Assert.assertEquals(1, attrs.size());
+    NodeAttribute na = attrs.keySet().iterator().next();
+    Assert.assertEquals("host", na.getAttributeKey().getAttributeName());
+    Assert.assertEquals("host2", na.getAttributeValue());
+    Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType());
+    Assert.assertEquals(1, count.get());
+
+    // Send HBs to RM with the same node attributes
+    nodeStatusObject.setResponseId(++responseId);
+    heartbeatReq.setNodeStatus(nodeStatusObject);
+    resourceTrackerService.nodeHeartbeat(heartbeatReq);
+
+    nodeStatusObject.setResponseId(++responseId);
+    heartbeatReq.setNodeStatus(nodeStatusObject);
+    resourceTrackerService.nodeHeartbeat(heartbeatReq);
+
+    // Make sure RM updated node attributes once
+    Assert.assertEquals(1, count.get());
+
+    // Send another HB to RM with updated node attributes
+    nodeAttributes.clear();
+    nodeAttributes.add(NodeAttribute.newInstance(
+        NodeAttribute.PREFIX_DISTRIBUTED, "host",
+        NodeAttributeType.STRING, "host3"));
+    nodeStatusObject.setResponseId(++responseId);
+    heartbeatReq.setNodeStatus(nodeStatusObject);
+    heartbeatReq.setNodeAttributes(nodeAttributes);
+    resourceTrackerService.nodeHeartbeat(heartbeatReq);
+
+    // Make sure RM gets the updated attribute
+    attrs = spyAttributeManager.getAttributesForNode(nodeId.getHost());
+    Assert.assertEquals(1, attrs.size());
+    na = attrs.keySet().iterator().next();
+    Assert.assertEquals("host", na.getAttributeKey().getAttributeName());
+    Assert.assertEquals("host3", na.getAttributeValue());
+    Assert.assertEquals(NodeAttributeType.STRING, na.getAttributeType());
+
+    // Make sure RM updated node attributes twice
+    Assert.assertEquals(2, count.get());
+
+    // Add centralized attributes
+    Map<String, Set<NodeAttribute>> nodeAttributeMapping = ImmutableMap
+        .of(nodeId.getHost(), ImmutableSet.of(NodeAttribute.newInstance(
+            NodeAttribute.PREFIX_CENTRALIZED, "centAttr",
+            NodeAttributeType.STRING, "x")));
+    spyAttributeManager.replaceNodeAttributes(NodeAttribute.PREFIX_CENTRALIZED,
+        nodeAttributeMapping);
+
+    // Make sure RM updated node attributes three times
+    Assert.assertEquals(3, count.get());
+
+    // Send another HB to RM with non-updated node attributes
+    nodeAttributes.clear();
+    nodeAttributes.add(NodeAttribute.newInstance(
+        NodeAttribute.PREFIX_DISTRIBUTED, "host",
+        NodeAttributeType.STRING, "host3"));
+    nodeStatusObject.setResponseId(++responseId);
+    heartbeatReq.setNodeStatus(nodeStatusObject);
+    heartbeatReq.setNodeAttributes(nodeAttributes);
+    resourceTrackerService.nodeHeartbeat(heartbeatReq);
+
+    // Make sure RM still updated node attributes three times
+    Assert.assertEquals(3, count.get());
+
+    // Send another HB to RM with updated node attributes
+    nodeAttributes.clear();
+    nodeAttributes.add(NodeAttribute.newInstance(
+        NodeAttribute.PREFIX_DISTRIBUTED, "host",
+        NodeAttributeType.STRING, "host4"));
+    nodeStatusObject.setResponseId(++responseId);
+    heartbeatReq.setNodeStatus(nodeStatusObject);
+    heartbeatReq.setNodeAttributes(nodeAttributes);
+    resourceTrackerService.nodeHeartbeat(heartbeatReq);
+
+    // Make sure RM gets the updated attribute
+    attrs = spyAttributeManager.getAttributesForNode(nodeId.getHost());
+    Assert.assertEquals(2, attrs.size());
+    attrs.keySet().stream().forEach(e -> {
+      Assert.assertEquals(NodeAttributeType.STRING, e.getAttributeType());
+      if (e.getAttributeKey().getAttributePrefix()
+          == NodeAttribute.PREFIX_DISTRIBUTED) {
+        Assert.assertEquals("host", e.getAttributeKey().getAttributeName());
+        Assert.assertEquals("host4", e.getAttributeValue());
+      } else if (e.getAttributeKey().getAttributePrefix()
+          == NodeAttribute.PREFIX_CENTRALIZED) {
+        Assert.assertEquals("centAttr", e.getAttributeKey().getAttributeName());
+        Assert.assertEquals("x", e.getAttributeValue());
+      }
+    });
+
+    // Make sure RM updated node attributes four times
+    Assert.assertEquals(4, count.get());
+
+    if (rm != null) {
+      rm.stop();
+    }
+  }
+
   @Test
   public void testNodeHeartBeatWithInvalidLabels() throws Exception {
     writeToHostsFile("host2");
@@ -2402,4 +2818,34 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
     Assert.assertEquals(NodeAction.NORMAL, nodeHeartbeat.getNodeAction());
     Assert.assertEquals(1, nodeHeartbeat.getResponseId());
   }
+
+  /**
+   * A no-op implementation of NodeAttributeStore for testing.
+   */
+  public static class NullNodeAttributeStore implements NodeAttributeStore {
+
+    @Override
+    public void replaceNodeAttributes(List<NodeToAttributes> nodeToAttribute) {
+    }
+
+    @Override
+    public void addNodeAttributes(List<NodeToAttributes> nodeToAttribute) {
+    }
+
+    @Override
+    public void removeNodeAttributes(List<NodeToAttributes> nodeToAttribute) {
+    }
+
+    @Override
+    public void init(Configuration configuration, NodeAttributesManager mgr) {
+    }
+
+    @Override
+    public void recover() {
+    }
+
+    @Override
+    public void close() {
+    }
+  }
 }