|
@@ -19,21 +19,20 @@
|
|
package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
|
|
package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
|
|
|
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
|
+import java.util.ArrayList;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
|
|
+import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.Map.Entry;
|
|
import java.util.Map.Entry;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
import java.util.TreeMap;
|
|
import java.util.TreeMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
-import java.util.concurrent.ConcurrentHashMap.KeySetView;
|
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
|
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
|
-import java.util.ArrayList;
|
|
|
|
-import java.util.List;
|
|
|
|
|
|
|
|
import com.google.common.base.Strings;
|
|
import com.google.common.base.Strings;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
@@ -42,6 +41,7 @@ import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
import org.apache.hadoop.util.ReflectionUtils;
|
|
import org.apache.hadoop.yarn.api.records.NodeAttribute;
|
|
import org.apache.hadoop.yarn.api.records.NodeAttribute;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
@@ -58,6 +58,8 @@ import org.apache.hadoop.yarn.nodelabels.StringAttributeValue;
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
|
|
|
|
|
|
|
|
+import com.google.common.base.Strings;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Manager holding the attributes to Labels.
|
|
* Manager holding the attributes to Labels.
|
|
*/
|
|
*/
|
|
@@ -75,8 +77,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
|
|
|
|
|
|
// TODO may be we can have a better collection here.
|
|
// TODO may be we can have a better collection here.
|
|
// this will be updated to get the attributeName to NM mapping
|
|
// this will be updated to get the attributeName to NM mapping
|
|
- private ConcurrentHashMap<NodeAttribute, RMNodeAttribute> clusterAttributes =
|
|
|
|
- new ConcurrentHashMap<>();
|
|
|
|
|
|
+ private ConcurrentHashMap<NodeAttributeKey, RMNodeAttribute> clusterAttributes
|
|
|
|
+ = new ConcurrentHashMap<>();
|
|
|
|
|
|
// hostname -> (Map (attributeName -> NodeAttribute))
|
|
// hostname -> (Map (attributeName -> NodeAttribute))
|
|
// Instead of NodeAttribute, plan to have it in future as AttributeValue
|
|
// Instead of NodeAttribute, plan to have it in future as AttributeValue
|
|
@@ -149,7 +151,7 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
|
|
private void internalUpdateAttributesOnNodes(
|
|
private void internalUpdateAttributesOnNodes(
|
|
Map<String, Map<NodeAttribute, AttributeValue>> nodeAttributeMapping,
|
|
Map<String, Map<NodeAttribute, AttributeValue>> nodeAttributeMapping,
|
|
AttributeMappingOperationType op,
|
|
AttributeMappingOperationType op,
|
|
- Map<NodeAttribute, RMNodeAttribute> newAttributesToBeAdded,
|
|
|
|
|
|
+ Map<NodeAttributeKey, RMNodeAttribute> newAttributesToBeAdded,
|
|
String attributePrefix) {
|
|
String attributePrefix) {
|
|
try {
|
|
try {
|
|
writeLock.lock();
|
|
writeLock.lock();
|
|
@@ -210,13 +212,14 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
|
|
private void removeNodeFromAttributes(String nodeHost,
|
|
private void removeNodeFromAttributes(String nodeHost,
|
|
Set<NodeAttribute> attributeMappings) {
|
|
Set<NodeAttribute> attributeMappings) {
|
|
for (NodeAttribute rmAttribute : attributeMappings) {
|
|
for (NodeAttribute rmAttribute : attributeMappings) {
|
|
- RMNodeAttribute host = clusterAttributes.get(rmAttribute);
|
|
|
|
|
|
+ RMNodeAttribute host =
|
|
|
|
+ clusterAttributes.get(rmAttribute.getAttributeKey());
|
|
if (host != null) {
|
|
if (host != null) {
|
|
host.removeNode(nodeHost);
|
|
host.removeNode(nodeHost);
|
|
// If there is no other host has such attribute,
|
|
// If there is no other host has such attribute,
|
|
// remove it from the global mapping.
|
|
// remove it from the global mapping.
|
|
if (host.getAssociatedNodeIds().isEmpty()) {
|
|
if (host.getAssociatedNodeIds().isEmpty()) {
|
|
- clusterAttributes.remove(rmAttribute);
|
|
|
|
|
|
+ clusterAttributes.remove(rmAttribute.getAttributeKey());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -224,12 +227,16 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
|
|
|
|
|
|
private void addNodeToAttribute(String nodeHost,
|
|
private void addNodeToAttribute(String nodeHost,
|
|
Map<NodeAttribute, AttributeValue> attributeMappings) {
|
|
Map<NodeAttribute, AttributeValue> attributeMappings) {
|
|
- for (NodeAttribute attribute : attributeMappings.keySet()) {
|
|
|
|
- RMNodeAttribute rmNodeAttribute = clusterAttributes.get(attribute);
|
|
|
|
|
|
+ for (Entry<NodeAttribute, AttributeValue> attributeEntry : attributeMappings
|
|
|
|
+ .entrySet()) {
|
|
|
|
+
|
|
|
|
+ RMNodeAttribute rmNodeAttribute =
|
|
|
|
+ clusterAttributes.get(attributeEntry.getKey().getAttributeKey());
|
|
if (rmNodeAttribute != null) {
|
|
if (rmNodeAttribute != null) {
|
|
- rmNodeAttribute.addNode(nodeHost);
|
|
|
|
|
|
+ rmNodeAttribute.addNode(nodeHost, attributeEntry.getValue());
|
|
} else {
|
|
} else {
|
|
- clusterAttributes.put(attribute, new RMNodeAttribute(attribute));
|
|
|
|
|
|
+ clusterAttributes.put(attributeEntry.getKey().getAttributeKey(),
|
|
|
|
+ new RMNodeAttribute(attributeEntry.getKey()));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -257,7 +264,7 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
|
|
*/
|
|
*/
|
|
protected Map<String, Map<NodeAttribute, AttributeValue>> validate(
|
|
protected Map<String, Map<NodeAttribute, AttributeValue>> validate(
|
|
Map<String, Set<NodeAttribute>> nodeAttributeMapping,
|
|
Map<String, Set<NodeAttribute>> nodeAttributeMapping,
|
|
- Map<NodeAttribute, RMNodeAttribute> newAttributesToBeAdded,
|
|
|
|
|
|
+ Map<NodeAttributeKey, RMNodeAttribute> newAttributesToBeAdded,
|
|
boolean isRemoveOperation) throws IOException {
|
|
boolean isRemoveOperation) throws IOException {
|
|
Map<String, Map<NodeAttribute, AttributeValue>> nodeToAttributesMap =
|
|
Map<String, Map<NodeAttribute, AttributeValue>> nodeToAttributesMap =
|
|
new TreeMap<>();
|
|
new TreeMap<>();
|
|
@@ -274,19 +281,21 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
|
|
|
|
|
|
// validate for attributes
|
|
// validate for attributes
|
|
for (NodeAttribute attribute : nodeToAttrMappingEntry.getValue()) {
|
|
for (NodeAttribute attribute : nodeToAttrMappingEntry.getValue()) {
|
|
- String attributeName = attribute.getAttributeName().trim();
|
|
|
|
|
|
+ NodeAttributeKey attributeKey = attribute.getAttributeKey();
|
|
|
|
+ String attributeName = attributeKey.getAttributeName().trim();
|
|
NodeLabelUtil.checkAndThrowLabelName(attributeName);
|
|
NodeLabelUtil.checkAndThrowLabelName(attributeName);
|
|
NodeLabelUtil
|
|
NodeLabelUtil
|
|
- .checkAndThrowAttributePrefix(attribute.getAttributePrefix());
|
|
|
|
|
|
+ .checkAndThrowAttributePrefix(attributeKey.getAttributePrefix());
|
|
|
|
|
|
// ensure trimmed values are set back
|
|
// ensure trimmed values are set back
|
|
- attribute.setAttributeName(attributeName);
|
|
|
|
- attribute.setAttributePrefix(attribute.getAttributePrefix().trim());
|
|
|
|
|
|
+ attributeKey.setAttributeName(attributeName);
|
|
|
|
+ attributeKey
|
|
|
|
+ .setAttributePrefix(attributeKey.getAttributePrefix().trim());
|
|
|
|
|
|
// verify for type against prefix/attributeName
|
|
// verify for type against prefix/attributeName
|
|
if (validateForAttributeTypeMismatch(isRemoveOperation, attribute,
|
|
if (validateForAttributeTypeMismatch(isRemoveOperation, attribute,
|
|
newAttributesToBeAdded)) {
|
|
newAttributesToBeAdded)) {
|
|
- newAttributesToBeAdded.put(attribute,
|
|
|
|
|
|
+ newAttributesToBeAdded.put(attribute.getAttributeKey(),
|
|
new RMNodeAttribute(attribute));
|
|
new RMNodeAttribute(attribute));
|
|
}
|
|
}
|
|
// TODO type based value setting needs to be done using a factory
|
|
// TODO type based value setting needs to be done using a factory
|
|
@@ -310,9 +319,11 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
|
|
*/
|
|
*/
|
|
private boolean validateForAttributeTypeMismatch(boolean isRemoveOperation,
|
|
private boolean validateForAttributeTypeMismatch(boolean isRemoveOperation,
|
|
NodeAttribute attribute,
|
|
NodeAttribute attribute,
|
|
- Map<NodeAttribute, RMNodeAttribute> newAttributes)
|
|
|
|
|
|
+ Map<NodeAttributeKey, RMNodeAttribute> newAttributes)
|
|
throws IOException {
|
|
throws IOException {
|
|
- if (isRemoveOperation && !clusterAttributes.containsKey(attribute)) {
|
|
|
|
|
|
+ NodeAttributeKey attributeKey = attribute.getAttributeKey();
|
|
|
|
+ if (isRemoveOperation
|
|
|
|
+ && !clusterAttributes.containsKey(attributeKey)) {
|
|
// no need to validate anything as its remove operation and attribute
|
|
// no need to validate anything as its remove operation and attribute
|
|
// doesn't exist.
|
|
// doesn't exist.
|
|
return false; // no need to add as its remove operation
|
|
return false; // no need to add as its remove operation
|
|
@@ -320,10 +331,10 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
|
|
// already existing or attribute is mapped to another Node in the
|
|
// already existing or attribute is mapped to another Node in the
|
|
// current command, then check whether the attribute type is matching
|
|
// current command, then check whether the attribute type is matching
|
|
NodeAttribute existingAttribute =
|
|
NodeAttribute existingAttribute =
|
|
- (clusterAttributes.containsKey((attribute))
|
|
|
|
- ? clusterAttributes.get(attribute).getAttribute()
|
|
|
|
- : (newAttributes.containsKey(attribute)
|
|
|
|
- ? newAttributes.get(attribute).getAttribute()
|
|
|
|
|
|
+ (clusterAttributes.containsKey(attributeKey)
|
|
|
|
+ ? clusterAttributes.get(attributeKey).getAttribute()
|
|
|
|
+ : (newAttributes.containsKey(attributeKey)
|
|
|
|
+ ? newAttributes.get(attributeKey).getAttribute()
|
|
: null));
|
|
: null));
|
|
if (existingAttribute == null) {
|
|
if (existingAttribute == null) {
|
|
return true;
|
|
return true;
|
|
@@ -331,7 +342,7 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
|
|
.getAttributeType()) {
|
|
.getAttributeType()) {
|
|
throw new IOException("Attribute name - type is not matching with "
|
|
throw new IOException("Attribute name - type is not matching with "
|
|
+ "already configured mapping for the attribute "
|
|
+ "already configured mapping for the attribute "
|
|
- + attribute.getAttributeName() + " existing : "
|
|
|
|
|
|
+ + attributeKey + " existing : "
|
|
+ existingAttribute.getAttributeType() + ", new :"
|
|
+ existingAttribute.getAttributeType() + ", new :"
|
|
+ attribute.getAttributeType());
|
|
+ attribute.getAttributeType());
|
|
}
|
|
}
|
|
@@ -347,37 +358,39 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- public Set<NodeAttribute> getClusterNodeAttributes(Set<String> prefix) {
|
|
|
|
|
|
+ public Set<NodeAttribute> getClusterNodeAttributes(
|
|
|
|
+ Set<String> prefix) {
|
|
Set<NodeAttribute> attributes = new HashSet<>();
|
|
Set<NodeAttribute> attributes = new HashSet<>();
|
|
- KeySetView<NodeAttribute, RMNodeAttribute> allAttributes =
|
|
|
|
- clusterAttributes.keySet();
|
|
|
|
|
|
+ Set<Entry<NodeAttributeKey, RMNodeAttribute>> allAttributes =
|
|
|
|
+ clusterAttributes.entrySet();
|
|
// Return all if prefix is not given.
|
|
// Return all if prefix is not given.
|
|
- if (prefix == null || prefix.isEmpty()) {
|
|
|
|
- attributes.addAll(allAttributes);
|
|
|
|
- return attributes;
|
|
|
|
- }
|
|
|
|
|
|
+ boolean forAllPrefix = prefix == null || prefix.isEmpty();
|
|
// Try search attributes by prefix and return valid ones.
|
|
// Try search attributes by prefix and return valid ones.
|
|
- Iterator<NodeAttribute> iterator = allAttributes.iterator();
|
|
|
|
|
|
+ Iterator<Entry<NodeAttributeKey, RMNodeAttribute>> iterator =
|
|
|
|
+ allAttributes.iterator();
|
|
while (iterator.hasNext()) {
|
|
while (iterator.hasNext()) {
|
|
- NodeAttribute current = iterator.next();
|
|
|
|
- if (prefix.contains(current.getAttributePrefix())) {
|
|
|
|
- attributes.add(current);
|
|
|
|
|
|
+ Entry<NodeAttributeKey, RMNodeAttribute> current = iterator.next();
|
|
|
|
+ NodeAttributeKey attrID = current.getKey();
|
|
|
|
+ RMNodeAttribute rmAttr = current.getValue();
|
|
|
|
+ if (forAllPrefix || prefix.contains(attrID.getAttributePrefix())) {
|
|
|
|
+ attributes.add(rmAttr.getAttribute());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return attributes;
|
|
return attributes;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- public Map<NodeAttribute, Set<String>> getAttributesToNodes(
|
|
|
|
- Set<NodeAttribute> attributes) {
|
|
|
|
|
|
+ public Map<NodeAttributeKey, Map<String, AttributeValue>> getAttributesToNodes(
|
|
|
|
+ Set<NodeAttributeKey> attributes) {
|
|
try {
|
|
try {
|
|
readLock.lock();
|
|
readLock.lock();
|
|
boolean fetchAllAttributes = (attributes == null || attributes.isEmpty());
|
|
boolean fetchAllAttributes = (attributes == null || attributes.isEmpty());
|
|
- Map<NodeAttribute, Set<String>> attributesToNodes = new HashMap<>();
|
|
|
|
- for (Entry<NodeAttribute, RMNodeAttribute> attributeEntry :
|
|
|
|
|
|
+ Map<NodeAttributeKey, Map<String, AttributeValue>> attributesToNodes =
|
|
|
|
+ new HashMap<>();
|
|
|
|
+ for (Entry<NodeAttributeKey, RMNodeAttribute> attributeEntry :
|
|
clusterAttributes.entrySet()) {
|
|
clusterAttributes.entrySet()) {
|
|
- if (fetchAllAttributes || attributes
|
|
|
|
- .contains(attributeEntry.getKey())) {
|
|
|
|
|
|
+ if (fetchAllAttributes
|
|
|
|
+ || attributes.contains(attributeEntry.getKey())) {
|
|
attributesToNodes.put(attributeEntry.getKey(),
|
|
attributesToNodes.put(attributeEntry.getKey(),
|
|
attributeEntry.getValue().getAssociatedNodeIds());
|
|
attributeEntry.getValue().getAssociatedNodeIds());
|
|
}
|
|
}
|
|
@@ -391,8 +404,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
|
|
public Resource getResourceByAttribute(NodeAttribute attribute) {
|
|
public Resource getResourceByAttribute(NodeAttribute attribute) {
|
|
try {
|
|
try {
|
|
readLock.lock();
|
|
readLock.lock();
|
|
- return clusterAttributes.containsKey(attribute)
|
|
|
|
- ? clusterAttributes.get(attribute).getResource()
|
|
|
|
|
|
+ return clusterAttributes.containsKey(attribute.getAttributeKey())
|
|
|
|
+ ? clusterAttributes.get(attribute.getAttributeKey()).getResource()
|
|
: Resource.newInstance(0, 0);
|
|
: Resource.newInstance(0, 0);
|
|
} finally {
|
|
} finally {
|
|
readLock.unlock();
|
|
readLock.unlock();
|
|
@@ -425,7 +438,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
|
|
attrs = new ArrayList<>();
|
|
attrs = new ArrayList<>();
|
|
for (Entry<NodeAttribute, AttributeValue> nodeAttr : v.attributes
|
|
for (Entry<NodeAttribute, AttributeValue> nodeAttr : v.attributes
|
|
.entrySet()) {
|
|
.entrySet()) {
|
|
- if (prefix.contains(nodeAttr.getKey().getAttributePrefix())) {
|
|
|
|
|
|
+ if (prefix.contains(
|
|
|
|
+ nodeAttr.getKey().getAttributeKey().getAttributePrefix())) {
|
|
attrs.add(nodeAttr.getKey());
|
|
attrs.add(nodeAttr.getKey());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -473,7 +487,7 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
|
|
}
|
|
}
|
|
host.activateNode(resource);
|
|
host.activateNode(resource);
|
|
for (NodeAttribute attribute : host.getAttributes().keySet()) {
|
|
for (NodeAttribute attribute : host.getAttributes().keySet()) {
|
|
- clusterAttributes.get(attribute).removeNode(resource);
|
|
|
|
|
|
+ clusterAttributes.get(attribute.getAttributeKey()).removeNode(resource);
|
|
}
|
|
}
|
|
} finally {
|
|
} finally {
|
|
writeLock.unlock();
|
|
writeLock.unlock();
|
|
@@ -485,7 +499,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
|
|
writeLock.lock();
|
|
writeLock.lock();
|
|
Host host = nodeCollections.get(nodeId.getHost());
|
|
Host host = nodeCollections.get(nodeId.getHost());
|
|
for (NodeAttribute attribute : host.getAttributes().keySet()) {
|
|
for (NodeAttribute attribute : host.getAttributes().keySet()) {
|
|
- clusterAttributes.get(attribute).removeNode(host.getResource());
|
|
|
|
|
|
+ clusterAttributes.get(attribute.getAttributeKey())
|
|
|
|
+ .removeNode(host.getResource());
|
|
}
|
|
}
|
|
host.deactivateNode();
|
|
host.deactivateNode();
|
|
} finally {
|
|
} finally {
|
|
@@ -531,7 +546,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
|
|
this.attributes.entrySet().iterator();
|
|
this.attributes.entrySet().iterator();
|
|
while (it.hasNext()) {
|
|
while (it.hasNext()) {
|
|
Entry<NodeAttribute, AttributeValue> current = it.next();
|
|
Entry<NodeAttribute, AttributeValue> current = it.next();
|
|
- if (prefix.equals(current.getKey().getAttributePrefix())) {
|
|
|
|
|
|
+ if (prefix.equals(
|
|
|
|
+ current.getKey().getAttributeKey().getAttributePrefix())) {
|
|
it.remove();
|
|
it.remove();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -659,7 +675,7 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager {
|
|
Map<String, Set<NodeAttribute>> nodeAttributeMapping,
|
|
Map<String, Set<NodeAttribute>> nodeAttributeMapping,
|
|
AttributeMappingOperationType mappingType, String attributePrefix)
|
|
AttributeMappingOperationType mappingType, String attributePrefix)
|
|
throws IOException {
|
|
throws IOException {
|
|
- Map<NodeAttribute, RMNodeAttribute> newAttributesToBeAdded =
|
|
|
|
|
|
+ Map<NodeAttributeKey, RMNodeAttribute> newAttributesToBeAdded =
|
|
new HashMap<>();
|
|
new HashMap<>();
|
|
Map<String, Map<NodeAttribute, AttributeValue>> validMapping =
|
|
Map<String, Map<NodeAttribute, AttributeValue>> validMapping =
|
|
validate(nodeAttributeMapping, newAttributesToBeAdded, false);
|
|
validate(nodeAttributeMapping, newAttributesToBeAdded, false);
|