소스 검색

YARN-2494. Added NodeLabels Manager internal API and implementation. Contributed by Wangda Tan.

(cherry picked from commit db7f1653198b950e89567c06898d64f6b930a0ee)
Vinod Kumar Vavilapalli 10 년 전
부모
커밋
bb6c79f76c
17개의 변경된 파일2785개의 추가작업 그리고 0개의 파일을 삭제
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 11 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  3. 722 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java
  4. 255 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java
  5. 69 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelsStore.java
  6. 28 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/NodeLabelsStoreEvent.java
  7. 25 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/NodeLabelsStoreEventType.java
  8. 34 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/RemoveClusterNodeLabels.java
  9. 34 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/StoreNewClusterNodeLabels.java
  10. 37 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/UpdateNodeToLabelsMappingsEvent.java
  11. 81 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java
  12. 76 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/NodeLabelTestBase.java
  13. 261 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestCommonNodeLabelsManager.java
  14. 252 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestFileSystemNodeLabelsStore.java
  15. 447 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
  16. 83 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/DummyRMNodeLabelsManager.java
  17. 367 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -93,6 +93,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2544. Added admin-API objects for using node-labels. (Wangda Tan via
     vinodkv)
 
+    YARN-2494. Added NodeLabels Manager internal API and implementation. (Wangda
+    Tan via vinodkv)
+
   IMPROVEMENTS
 
     YARN-2242. Improve exception information on AM launch crashes. (Li Lu 

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -1376,6 +1376,17 @@ public class YarnConfiguration extends Configuration {
   public static final String YARN_HTTP_POLICY_KEY = YARN_PREFIX + "http.policy";
   public static final String YARN_HTTP_POLICY_DEFAULT = HttpConfig.Policy.HTTP_ONLY
       .name();
+  
+  public static final String NODE_LABELS_PREFIX = YARN_PREFIX + "node-labels.";
+
+  /** URI for NodeLabelManager */
+  public static final String FS_NODE_LABELS_STORE_URI = NODE_LABELS_PREFIX
+      + "fs-store.uri";
+  public static final String DEFAULT_FS_NODE_LABELS_STORE_URI = "file:///tmp/";
+  public static final String FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC =
+      NODE_LABELS_PREFIX + "fs-store.retry-policy-spec";
+  public static final String DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC =
+      "2000, 500";
 
   public YarnConfiguration() {
     super();

+ 722 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java

@@ -0,0 +1,722 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.nodelabels.event.StoreNewClusterNodeLabels;
+import org.apache.hadoop.yarn.nodelabels.event.NodeLabelsStoreEvent;
+import org.apache.hadoop.yarn.nodelabels.event.NodeLabelsStoreEventType;
+import org.apache.hadoop.yarn.nodelabels.event.RemoveClusterNodeLabels;
+import org.apache.hadoop.yarn.nodelabels.event.UpdateNodeToLabelsMappingsEvent;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.collect.ImmutableSet;
+
+public class CommonNodeLabelsManager extends AbstractService {
+  protected static final Log LOG = LogFactory.getLog(CommonNodeLabelsManager.class);
+  private static final int MAX_LABEL_LENGTH = 255;
+  public static final Set<String> EMPTY_STRING_SET = Collections
+      .unmodifiableSet(new HashSet<String>(0));
+  public static final String ANY = "*";
+  public static final Set<String> ACCESS_ANY_LABEL_SET = ImmutableSet.of(ANY);
+  private static final Pattern LABEL_PATTERN = Pattern
+      .compile("^[0-9a-zA-Z][0-9a-zA-Z-_]*");
+  public static final int WILDCARD_PORT = 0;
+
+  /**
+   * If a user doesn't specify label of a queue or node, it belongs
+   * DEFAULT_LABEL
+   */
+  public static final String NO_LABEL = "";
+
+  protected Dispatcher dispatcher;
+
+  protected ConcurrentMap<String, Label> labelCollections =
+      new ConcurrentHashMap<String, Label>();
+  protected ConcurrentMap<String, Host> nodeCollections =
+      new ConcurrentHashMap<String, Host>();
+
+  protected final ReadLock readLock;
+  protected final WriteLock writeLock;
+
+  protected NodeLabelsStore store;
+
+  protected static class Label {
+    public Resource resource;
+
+    protected Label() {
+      this.resource = Resource.newInstance(0, 0);
+    }
+  }
+
+  /**
+   * A <code>Host</code> can have multiple <code>Node</code>s 
+   */
+  protected static class Host {
+    public Set<String> labels;
+    public Map<NodeId, Node> nms;
+    
+    protected Host() {
+      labels =
+          Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+      nms = new ConcurrentHashMap<NodeId, Node>();
+    }
+    
+    public Host copy() {
+      Host c = new Host();
+      c.labels = new HashSet<String>(labels);
+      for (Entry<NodeId, Node> entry : nms.entrySet()) {
+        c.nms.put(entry.getKey(), entry.getValue().copy());
+      }
+      return c;
+    }
+  }
+  
+  protected static class Node {
+    public Set<String> labels;
+    public Resource resource;
+    public boolean running;
+    
+    protected Node() {
+      labels = null;
+      resource = Resource.newInstance(0, 0);
+      running = false;
+    }
+    
+    public Node copy() {
+      Node c = new Node();
+      if (labels != null) {
+        c.labels =
+            Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+      } else {
+        c.labels = null;
+      }
+      c.resource = Resources.clone(resource);
+      c.running = running;
+      return c;
+    }
+  }
+
+  private final class ForwardingEventHandler implements
+      EventHandler<NodeLabelsStoreEvent> {
+
+    @Override
+    public void handle(NodeLabelsStoreEvent event) {
+      if (isInState(STATE.STARTED)) {
+        handleStoreEvent(event);
+      }
+    }
+  }
+  
+  // Dispatcher related code
+  protected void handleStoreEvent(NodeLabelsStoreEvent event) {
+    try {
+      switch (event.getType()) {
+      case ADD_LABELS:
+        StoreNewClusterNodeLabels storeNewClusterNodeLabelsEvent =
+            (StoreNewClusterNodeLabels) event;
+        store.storeNewClusterNodeLabels(storeNewClusterNodeLabelsEvent
+             .getLabels());
+        break;
+      case REMOVE_LABELS:
+        RemoveClusterNodeLabels removeClusterNodeLabelsEvent =
+            (RemoveClusterNodeLabels) event;
+        store.removeClusterNodeLabels(removeClusterNodeLabelsEvent.getLabels());
+        break;
+      case STORE_NODE_TO_LABELS:
+        UpdateNodeToLabelsMappingsEvent updateNodeToLabelsMappingsEvent =
+            (UpdateNodeToLabelsMappingsEvent) event;
+        store.updateNodeToLabelsMappings(updateNodeToLabelsMappingsEvent
+            .getNodeToLabels());
+        break;
+      }
+    } catch (IOException e) {
+      LOG.error("Failed to store label modification to storage");
+      throw new YarnRuntimeException(e);
+    }
+  }
+
+  public CommonNodeLabelsManager() {
+    super(CommonNodeLabelsManager.class.getName());
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    readLock = lock.readLock();
+    writeLock = lock.writeLock();
+  }
+
+  // for UT purpose
+  protected void initDispatcher(Configuration conf) {
+    // create async handler
+    dispatcher = new AsyncDispatcher();
+    AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher;
+    asyncDispatcher.init(conf);
+    asyncDispatcher.setDrainEventsOnStop();
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    initNodeLabelStore(conf);
+    
+    labelCollections.put(NO_LABEL, new Label());
+  }
+
+  protected void initNodeLabelStore(Configuration conf) throws Exception {
+    this.store = new FileSystemNodeLabelsStore(this);
+    this.store.init(conf);
+    this.store.recover();
+  }
+
+  // for UT purpose
+  protected void startDispatcher() {
+    // start dispatcher
+    AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher;
+    asyncDispatcher.start();
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    // init dispatcher only when service start, because recover will happen in
+    // service init, we don't want to trigger any event handling at that time.
+    initDispatcher(getConfig());
+
+    dispatcher.register(NodeLabelsStoreEventType.class,
+        new ForwardingEventHandler());
+
+    startDispatcher();
+  }
+  
+  // for UT purpose
+  protected void stopDispatcher() {
+    AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher;
+    asyncDispatcher.stop();
+  }
+  
+  @Override
+  protected void serviceStop() throws Exception {
+    // finalize store
+    stopDispatcher();
+    store.close();
+  }
+
+  /**
+   * Add multiple node labels to repository
+   * 
+   * @param labels
+   *          new node labels added
+   */
+  @SuppressWarnings("unchecked")
+  public void addToCluserNodeLabels(Set<String> labels) throws IOException {
+    if (null == labels || labels.isEmpty()) {
+      return;
+    }
+
+    labels = normalizeLabels(labels);
+
+    // do a check before actual adding them, will throw exception if any of them
+    // doesn't meet label name requirement
+    for (String label : labels) {
+      checkAndThrowLabelName(label);
+    }
+
+    for (String label : labels) {
+      this.labelCollections.put(label, new Label());
+    }
+    if (null != dispatcher) {
+      dispatcher.getEventHandler().handle(
+          new StoreNewClusterNodeLabels(labels));
+    }
+
+    LOG.info("Add labels: [" + StringUtils.join(labels.iterator(), ",") + "]");
+  }
+  
+  protected void checkAddLabelsToNode(
+      Map<NodeId, Set<String>> addedLabelsToNode) throws IOException {
+    if (null == addedLabelsToNode || addedLabelsToNode.isEmpty()) {
+      return;
+    }
+
+    // check all labels being added existed
+    Set<String> knownLabels = labelCollections.keySet();
+    for (Entry<NodeId, Set<String>> entry : addedLabelsToNode.entrySet()) {
+      if (!knownLabels.containsAll(entry.getValue())) {
+        String msg =
+            "Not all labels being added contained by known "
+                + "label collections, please check" + ", added labels=["
+                + StringUtils.join(entry.getValue(), ",") + "]";
+        LOG.error(msg);
+        throw new IOException(msg);
+      }
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  protected void internalAddLabelsToNode(
+      Map<NodeId, Set<String>> addedLabelsToNode) throws IOException {
+    // do add labels to nodes
+    Map<NodeId, Set<String>> newNMToLabels =
+        new HashMap<NodeId, Set<String>>();
+    for (Entry<NodeId, Set<String>> entry : addedLabelsToNode.entrySet()) {
+      NodeId nodeId = entry.getKey();
+      Set<String> labels = entry.getValue();
+      
+      createNodeIfNonExisted(entry.getKey());
+      
+      if (nodeId.getPort() == WILDCARD_PORT) {
+        Host host = nodeCollections.get(nodeId.getHost());
+        host.labels.addAll(labels);
+        newNMToLabels.put(nodeId, host.labels);
+      } else {
+        Node nm = getNMInNodeSet(nodeId);
+        if (nm.labels == null) {
+          nm.labels = new HashSet<String>();
+        }
+        nm.labels.addAll(labels);
+        newNMToLabels.put(nodeId, nm.labels);
+      }
+    }
+
+    if (null != dispatcher) {
+      dispatcher.getEventHandler().handle(
+          new UpdateNodeToLabelsMappingsEvent(newNMToLabels));
+    }
+
+    // shows node->labels we added
+    LOG.info("addLabelsToNode:");
+    for (Entry<NodeId, Set<String>> entry : newNMToLabels.entrySet()) {
+      LOG.info("  NM=" + entry.getKey() + ", labels=["
+          + StringUtils.join(entry.getValue().iterator(), ",") + "]");
+    }
+  }
+  
+  /**
+   * add more labels to nodes
+   * 
+   * @param addedLabelsToNode node -> labels map
+   */
+  public void addLabelsToNode(Map<NodeId, Set<String>> addedLabelsToNode)
+      throws IOException {
+    checkAddLabelsToNode(addedLabelsToNode);
+    internalAddLabelsToNode(addedLabelsToNode);
+  }
+  
+  protected void checkRemoveFromClusterNodeLabels(
+      Collection<String> labelsToRemove) throws IOException {
+    if (null == labelsToRemove || labelsToRemove.isEmpty()) {
+      return;
+    }
+
+    // Check if label to remove doesn't existed or null/empty, will throw
+    // exception if any of labels to remove doesn't meet requirement
+    for (String label : labelsToRemove) {
+      label = normalizeLabel(label);
+      if (label == null || label.isEmpty()) {
+        throw new IOException("Label to be removed is null or empty");
+      }
+      
+      if (!labelCollections.containsKey(label)) {
+        throw new IOException("Node label=" + label
+            + " to be removed doesn't existed in cluster "
+            + "node labels collection.");
+      }
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  protected void internalRemoveFromClusterNodeLabels(Collection<String> labelsToRemove) {
+    // remove labels from nodes
+    for (String nodeName : nodeCollections.keySet()) {
+      Host host = nodeCollections.get(nodeName);
+      if (null != host) {
+        host.labels.removeAll(labelsToRemove);
+        for (Node nm : host.nms.values()) {
+          if (nm.labels != null) {
+            nm.labels.removeAll(labelsToRemove);
+          }
+        }
+      }
+    }
+
+    // remove labels from node labels collection
+    for (String label : labelsToRemove) {
+      labelCollections.remove(label);
+    }
+
+    // create event to remove labels
+    if (null != dispatcher) {
+      dispatcher.getEventHandler().handle(
+          new RemoveClusterNodeLabels(labelsToRemove));
+    }
+
+    LOG.info("Remove labels: ["
+        + StringUtils.join(labelsToRemove.iterator(), ",") + "]");
+  }
+
+  /**
+   * Remove multiple node labels from repository
+   * 
+   * @param labelsToRemove
+   *          node labels to remove
+   * @throws IOException
+   */
+  public void removeFromClusterNodeLabels(Collection<String> labelsToRemove)
+      throws IOException {
+    checkRemoveFromClusterNodeLabels(labelsToRemove);
+
+    internalRemoveFromClusterNodeLabels(labelsToRemove);
+  }
+  
+  protected void checkRemoveLabelsFromNode(
+      Map<NodeId, Set<String>> removeLabelsFromNode) throws IOException {
+    // check all labels being added existed
+    Set<String> knownLabels = labelCollections.keySet();
+    for (Entry<NodeId, Set<String>> entry : removeLabelsFromNode.entrySet()) {
+      NodeId nodeId = entry.getKey();
+      Set<String> labels = entry.getValue();
+      
+      if (!knownLabels.containsAll(labels)) {
+        String msg =
+            "Not all labels being removed contained by known "
+                + "label collections, please check" + ", removed labels=["
+                + StringUtils.join(labels, ",") + "]";
+        LOG.error(msg);
+        throw new IOException(msg);
+      }
+      
+      Set<String> originalLabels = null;
+      
+      boolean nodeExisted = false;
+      if (WILDCARD_PORT != nodeId.getPort()) {
+        Node nm = getNMInNodeSet(nodeId);
+        if (nm != null) {
+          originalLabels = nm.labels;
+          nodeExisted = true;
+        }
+      } else {
+        Host host = nodeCollections.get(nodeId.getHost());
+        if (null != host) {
+          originalLabels = host.labels;
+          nodeExisted = true;
+        }
+      }
+      
+      if (!nodeExisted) {
+        String msg =
+            "Try to remove labels from NM=" + nodeId
+                + ", but the NM doesn't existed";
+        LOG.error(msg);
+        throw new IOException(msg);
+      }
+      
+      if (labels == null || labels.isEmpty()) {
+        continue;
+      }
+
+      if (!originalLabels.containsAll(labels)) {
+        String msg =
+            "Try to remove labels = [" + StringUtils.join(labels, ",")
+                + "], but not all labels contained by NM=" + nodeId;
+        LOG.error(msg);
+        throw new IOException(msg);
+      }
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  protected void internalRemoveLabelsFromNode(
+      Map<NodeId, Set<String>> removeLabelsFromNode) {
+    // do remove labels from nodes
+    Map<NodeId, Set<String>> newNMToLabels =
+        new HashMap<NodeId, Set<String>>();
+    for (Entry<NodeId, Set<String>> entry : removeLabelsFromNode.entrySet()) {
+      NodeId nodeId = entry.getKey();
+      Set<String> labels = entry.getValue();
+      
+      if (nodeId.getPort() == WILDCARD_PORT) {
+        Host host = nodeCollections.get(nodeId.getHost());
+        host.labels.removeAll(labels);
+        newNMToLabels.put(nodeId, host.labels);
+      } else {
+        Node nm = getNMInNodeSet(nodeId);
+        if (nm.labels != null) {
+          nm.labels.removeAll(labels);
+          newNMToLabels.put(nodeId, nm.labels);
+        }
+      }
+    }
+    
+    if (null != dispatcher) {
+      dispatcher.getEventHandler().handle(
+          new UpdateNodeToLabelsMappingsEvent(newNMToLabels));
+    }
+
+    // shows node->labels we added
+    LOG.info("removeLabelsFromNode:");
+    for (Entry<NodeId, Set<String>> entry : newNMToLabels.entrySet()) {
+      LOG.info("  NM=" + entry.getKey() + ", labels=["
+          + StringUtils.join(entry.getValue().iterator(), ",") + "]");
+    }
+  }
+  
+  /**
+   * remove labels from nodes, labels being removed most be contained by these
+   * nodes
+   * 
+   * @param removeLabelsFromNode node -> labels map
+   */
+  public void
+      removeLabelsFromNode(Map<NodeId, Set<String>> removeLabelsFromNode)
+          throws IOException {
+    checkRemoveLabelsFromNode(removeLabelsFromNode);
+
+    internalRemoveLabelsFromNode(removeLabelsFromNode);
+  }
+  
+  protected void checkReplaceLabelsOnNode(
+      Map<NodeId, Set<String>> replaceLabelsToNode) throws IOException {
+    if (null == replaceLabelsToNode || replaceLabelsToNode.isEmpty()) {
+      return;
+    }
+    
+    // check all labels being added existed
+    Set<String> knownLabels = labelCollections.keySet();
+    for (Entry<NodeId, Set<String>> entry : replaceLabelsToNode.entrySet()) {
+      if (!knownLabels.containsAll(entry.getValue())) {
+        String msg =
+            "Not all labels being replaced contained by known "
+                + "label collections, please check" + ", new labels=["
+                + StringUtils.join(entry.getValue(), ",") + "]";
+        LOG.error(msg);
+        throw new IOException(msg);
+      }
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  protected void internalReplaceLabelsOnNode(
+      Map<NodeId, Set<String>> replaceLabelsToNode) {
+    // do replace labels to nodes
+    Map<NodeId, Set<String>> newNMToLabels = new HashMap<NodeId, Set<String>>();
+    for (Entry<NodeId, Set<String>> entry : replaceLabelsToNode.entrySet()) {
+      NodeId nodeId = entry.getKey();
+      Set<String> labels = entry.getValue();
+
+      // update nodeCollections
+      createNodeIfNonExisted(entry.getKey());
+      if (nodeId.getPort() == WILDCARD_PORT) {
+        Host host = nodeCollections.get(nodeId.getHost());
+        host.labels.clear();
+        host.labels.addAll(labels);
+        newNMToLabels.put(nodeId, host.labels);
+      } else {
+        Node nm = getNMInNodeSet(nodeId);
+        if (nm.labels == null) {
+          nm.labels = new HashSet<String>();
+        }
+        nm.labels.clear();
+        nm.labels.addAll(labels);
+        newNMToLabels.put(nodeId, nm.labels);
+      }
+    }
+
+    if (null != dispatcher) {
+      dispatcher.getEventHandler().handle(
+          new UpdateNodeToLabelsMappingsEvent(newNMToLabels));
+    }
+
+    // shows node->labels we added
+    LOG.info("setLabelsToNode:");
+    for (Entry<NodeId, Set<String>> entry : newNMToLabels.entrySet()) {
+      LOG.info("  NM=" + entry.getKey() + ", labels=["
+          + StringUtils.join(entry.getValue().iterator(), ",") + "]");
+    }
+  }
+  
+  /**
+   * replace labels to nodes
+   * 
+   * @param replaceLabelsToNode node -> labels map
+   */
+  public void replaceLabelsOnNode(Map<NodeId, Set<String>> replaceLabelsToNode)
+      throws IOException {
+    checkReplaceLabelsOnNode(replaceLabelsToNode);
+
+    internalReplaceLabelsOnNode(replaceLabelsToNode);
+  }
+
+  /**
+   * Get mapping of nodes to labels
+   * 
+   * @return nodes to labels map
+   */
+  public Map<NodeId, Set<String>> getNodeLabels() {
+    try {
+      readLock.lock();
+      Map<NodeId, Set<String>> nodeToLabels =
+          new HashMap<NodeId, Set<String>>();
+      for (Entry<String, Host> entry : nodeCollections.entrySet()) {
+        String hostName = entry.getKey();
+        Host host = entry.getValue();
+        for (NodeId nodeId : host.nms.keySet()) {
+          Set<String> nodeLabels = getLabelsByNode(nodeId);
+          if (nodeLabels == null || nodeLabels.isEmpty()) {
+            continue;
+          }
+          nodeToLabels.put(nodeId, nodeLabels);
+        }
+        if (!host.labels.isEmpty()) {
+          nodeToLabels
+              .put(NodeId.newInstance(hostName, WILDCARD_PORT), host.labels);
+        }
+      }
+      return Collections.unmodifiableMap(nodeToLabels);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Get existing valid labels in repository
+   * 
+   * @return existing valid labels in repository
+   */
+  public Set<String> getClusterNodeLabels() {
+    try {
+      readLock.lock();
+      Set<String> labels = new HashSet<String>(labelCollections.keySet());
+      labels.remove(NO_LABEL);
+      return Collections.unmodifiableSet(labels);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private void checkAndThrowLabelName(String label) throws IOException {
+    if (label == null || label.isEmpty() || label.length() > MAX_LABEL_LENGTH) {
+      throw new IOException("label added is empty or exceeds "
+          + MAX_LABEL_LENGTH + " character(s)");
+    }
+    label = label.trim();
+
+    boolean match = LABEL_PATTERN.matcher(label).matches();
+
+    if (!match) {
+      throw new IOException("label name should only contains "
+          + "{0-9, a-z, A-Z, -, _} and should not started with {-,_}"
+          + ", now it is=" + label);
+    }
+  }
+
+  protected String normalizeLabel(String label) {
+    if (label != null) {
+      return label.trim();
+    }
+    return NO_LABEL;
+  }
+
+  private Set<String> normalizeLabels(Set<String> labels) {
+    Set<String> newLabels = new HashSet<String>();
+    for (String label : labels) {
+      newLabels.add(normalizeLabel(label));
+    }
+    return newLabels;
+  }
+  
+  protected Node getNMInNodeSet(NodeId nodeId) {
+    return getNMInNodeSet(nodeId, nodeCollections);
+  }
+  
+  protected Node getNMInNodeSet(NodeId nodeId, Map<String, Host> map) {
+    return getNMInNodeSet(nodeId, map, false);
+  }
+
+  protected Node getNMInNodeSet(NodeId nodeId, Map<String, Host> map,
+      boolean checkRunning) {
+    if (WILDCARD_PORT == nodeId.getPort()) {
+      return null;
+    }
+    
+    Host host = map.get(nodeId.getHost());
+    if (null == host) {
+      return null;
+    }
+    Node nm = host.nms.get(nodeId);
+    if (null == nm) {
+      return null;
+    }
+    if (checkRunning) {
+      return nm.running ? nm : null; 
+    }
+    return nm;
+  }
+  
+  protected Set<String> getLabelsByNode(NodeId nodeId) {
+    return getLabelsByNode(nodeId, nodeCollections);
+  }
+  
+  protected Set<String> getLabelsByNode(NodeId nodeId, Map<String, Host> map) {
+    Host host = map.get(nodeId.getHost());
+    if (null == host) {
+      return EMPTY_STRING_SET;
+    }
+    Node nm = host.nms.get(nodeId);
+    if (null != nm && null != nm.labels) {
+      return nm.labels;
+    } else {
+      return host.labels;
+    }
+  }
+  
+  protected void createNodeIfNonExisted(NodeId nodeId) {
+    Host host = nodeCollections.get(nodeId.getHost());
+    if (null == host) {
+      host = new Host();
+      nodeCollections.put(nodeId.getHost(), host);
+    }
+    if (nodeId.getPort() != WILDCARD_PORT) {
+      Node nm = host.nms.get(nodeId);
+      if (null == nm) {
+        host.nms.put(nodeId, new Node());
+      }
+    }
+  }
+}

+ 255 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/FileSystemNodeLabelsStore.java

@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl;
+
+import com.google.common.collect.Sets;
+
+public class FileSystemNodeLabelsStore extends NodeLabelsStore {
+
+  public FileSystemNodeLabelsStore(CommonNodeLabelsManager mgr) {
+    super(mgr);
+  }
+
+  protected static final Log LOG = LogFactory.getLog(FileSystemNodeLabelsStore.class);
+
+  protected static final String ROOT_DIR_NAME = "FSNodeLabelManagerRoot";
+  protected static final String MIRROR_FILENAME = "nodelabel.mirror";
+  protected static final String EDITLOG_FILENAME = "nodelabel.editlog";
+  
+  protected enum SerializedLogType {
+    ADD_LABELS, NODE_TO_LABELS, REMOVE_LABELS
+  }
+
+  Path fsWorkingPath;
+  Path rootDirPath;
+  FileSystem fs;
+  FSDataOutputStream editlogOs;
+  Path editLogPath;
+
+  @Override
+  public void init(Configuration conf) throws Exception {
+    fsWorkingPath =
+        new Path(conf.get(YarnConfiguration.FS_NODE_LABELS_STORE_URI,
+            YarnConfiguration.DEFAULT_FS_NODE_LABELS_STORE_URI));
+    rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
+
+    setFileSystem(conf);
+
+    // mkdir of root dir path
+    fs.mkdirs(rootDirPath);
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      fs.close();
+      editlogOs.close();
+    } catch (IOException e) {
+      LOG.warn("Exception happened whiling shutting down,", e);
+    }
+  }
+
+  private void setFileSystem(Configuration conf) throws IOException {
+    Configuration confCopy = new Configuration(conf);
+    confCopy.setBoolean("dfs.client.retry.policy.enabled", true);
+    String retryPolicy =
+        confCopy.get(YarnConfiguration.FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC,
+            YarnConfiguration.DEFAULT_FS_NODE_LABELS_STORE_RETRY_POLICY_SPEC);
+    confCopy.set("dfs.client.retry.policy.spec", retryPolicy);
+    fs = fsWorkingPath.getFileSystem(confCopy);
+    
+    // if it's local file system, use RawLocalFileSystem instead of
+    // LocalFileSystem, the latter one doesn't support append.
+    if (fs.getScheme().equals("file")) {
+      fs = ((LocalFileSystem)fs).getRaw();
+    }
+  }
+  
+  private void ensureAppendEditlogFile() throws IOException {
+    editlogOs = fs.append(editLogPath);
+  }
+  
+  private void ensureCloseEditlogFile() throws IOException {
+    editlogOs.close();
+  }
+
+  @Override
+  public void updateNodeToLabelsMappings(
+      Map<NodeId, Set<String>> nodeToLabels) throws IOException {
+    ensureAppendEditlogFile();
+    editlogOs.writeInt(SerializedLogType.NODE_TO_LABELS.ordinal());
+    ((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest
+        .newInstance(nodeToLabels)).getProto().writeDelimitedTo(editlogOs);
+    ensureCloseEditlogFile();
+  }
+
+  @Override
+  public void storeNewClusterNodeLabels(Set<String> labels)
+      throws IOException {
+    ensureAppendEditlogFile();
+    editlogOs.writeInt(SerializedLogType.ADD_LABELS.ordinal());
+    ((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequest.newInstance(labels)).getProto()
+        .writeDelimitedTo(editlogOs);
+    ensureCloseEditlogFile();
+  }
+
+  @Override
+  public void removeClusterNodeLabels(Collection<String> labels)
+      throws IOException {
+    ensureAppendEditlogFile();
+    editlogOs.writeInt(SerializedLogType.REMOVE_LABELS.ordinal());
+    ((RemoveFromClusterNodeLabelsRequestPBImpl) RemoveFromClusterNodeLabelsRequest.newInstance(Sets
+        .newHashSet(labels.iterator()))).getProto().writeDelimitedTo(editlogOs);
+    ensureCloseEditlogFile();
+  }
+
+  @Override
+  public void recover() throws IOException {
+    /*
+     * Steps of recover
+     * 1) Read from last mirror (from mirror or mirror.old)
+     * 2) Read from last edit log, and apply such edit log
+     * 3) Write new mirror to mirror.writing
+     * 4) Rename mirror to mirror.old
+     * 5) Move mirror.writing to mirror
+     * 6) Remove mirror.old
+     * 7) Remove edit log and create a new empty edit log 
+     */
+    
+    // Open mirror from serialized file
+    Path mirrorPath = new Path(rootDirPath, MIRROR_FILENAME);
+    Path oldMirrorPath = new Path(rootDirPath, MIRROR_FILENAME + ".old");
+
+    FSDataInputStream is = null;
+    if (fs.exists(mirrorPath)) {
+      is = fs.open(mirrorPath);
+    } else if (fs.exists(oldMirrorPath)) {
+      is = fs.open(oldMirrorPath);
+    }
+
+    if (null != is) {
+      Set<String> labels =
+          new AddToClusterNodeLabelsRequestPBImpl(
+              AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(is)).getNodeLabels();
+      Map<NodeId, Set<String>> nodeToLabels =
+          new ReplaceLabelsOnNodeRequestPBImpl(
+              ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is))
+              .getNodeToLabels();
+      mgr.addToCluserNodeLabels(labels);
+      mgr.replaceLabelsOnNode(nodeToLabels);
+      is.close();
+    }
+
+    // Open and process editlog
+    editLogPath = new Path(rootDirPath, EDITLOG_FILENAME);
+    if (fs.exists(editLogPath)) {
+      is = fs.open(editLogPath);
+
+      while (true) {
+        try {
+          // read edit log one by one
+          SerializedLogType type = SerializedLogType.values()[is.readInt()];
+          
+          switch (type) {
+          case ADD_LABELS: {
+            Collection<String> labels =
+                AddToClusterNodeLabelsRequestProto.parseDelimitedFrom(is)
+                    .getNodeLabelsList();
+            mgr.addToCluserNodeLabels(Sets.newHashSet(labels.iterator()));
+            break;
+          }
+          case REMOVE_LABELS: {
+            Collection<String> labels =
+                RemoveFromClusterNodeLabelsRequestProto.parseDelimitedFrom(is)
+                    .getNodeLabelsList();
+            mgr.removeFromClusterNodeLabels(labels);
+            break;
+          }
+          case NODE_TO_LABELS: {
+            Map<NodeId, Set<String>> map =
+                new ReplaceLabelsOnNodeRequestPBImpl(
+                    ReplaceLabelsOnNodeRequestProto.parseDelimitedFrom(is))
+                    .getNodeToLabels();
+            mgr.replaceLabelsOnNode(map);
+            break;
+          }
+          }
+        } catch (EOFException e) {
+          // EOF hit, break
+          break;
+        }
+      }
+    }
+
+    // Serialize current mirror to mirror.writing
+    Path writingMirrorPath = new Path(rootDirPath, MIRROR_FILENAME + ".writing");
+    FSDataOutputStream os = fs.create(writingMirrorPath, true);
+    ((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequestPBImpl
+        .newInstance(mgr.getClusterNodeLabels())).getProto().writeDelimitedTo(os);
+    ((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest
+        .newInstance(mgr.getNodeLabels())).getProto().writeDelimitedTo(os);
+    os.close();
+    
+    // Move mirror to mirror.old
+    if (fs.exists(mirrorPath)) {
+      fs.delete(oldMirrorPath, false);
+      fs.rename(mirrorPath, oldMirrorPath);
+    }
+    
+    // move mirror.writing to mirror
+    fs.rename(writingMirrorPath, mirrorPath);
+    fs.delete(writingMirrorPath, false);
+    
+    // remove mirror.old
+    fs.delete(oldMirrorPath, false);
+    
+    // create a new editlog file
+    editlogOs = fs.create(editLogPath, true);
+    editlogOs.close();
+    
+    LOG.info("Finished write mirror at:" + mirrorPath.toString());
+    LOG.info("Finished create editlog file at:" + editLogPath.toString());
+  }
+}

+ 69 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/NodeLabelsStore.java

@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public abstract class NodeLabelsStore implements Closeable {
+  protected final CommonNodeLabelsManager mgr;
+  protected Configuration conf;
+  
+  public NodeLabelsStore(CommonNodeLabelsManager mgr) {
+    this.mgr = mgr;
+  }
+  
+  /**
+   * Store node -> label
+   */
+  public abstract void updateNodeToLabelsMappings(
+      Map<NodeId, Set<String>> nodeToLabels) throws IOException;
+
+  /**
+   * Store new labels
+   */
+  public abstract void storeNewClusterNodeLabels(Set<String> label)
+      throws IOException;
+
+  /**
+   * Remove labels
+   */
+  public abstract void removeClusterNodeLabels(Collection<String> labels)
+      throws IOException;
+  
+  /**
+   * Recover labels and node to labels mappings from store
+   * @param conf
+   */
+  public abstract void recover() throws IOException;
+  
+  public void init(Configuration conf) throws Exception {
+    this.conf = conf;
+  }
+  
+  public CommonNodeLabelsManager getNodeLabelsManager() {
+    return mgr;
+  }
+}

+ 28 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/NodeLabelsStoreEvent.java

@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class NodeLabelsStoreEvent extends
+    AbstractEvent<NodeLabelsStoreEventType> {
+  public NodeLabelsStoreEvent(NodeLabelsStoreEventType type) {
+    super(type);
+  }
+}

+ 25 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/NodeLabelsStoreEventType.java

@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels.event;
+
+public enum NodeLabelsStoreEventType {
+  REMOVE_LABELS,
+  ADD_LABELS,
+  STORE_NODE_TO_LABELS
+}

+ 34 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/RemoveClusterNodeLabels.java

@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels.event;
+
+import java.util.Collection;
+
+public class RemoveClusterNodeLabels extends NodeLabelsStoreEvent {
+  private Collection<String> labels;
+  
+  public RemoveClusterNodeLabels(Collection<String> labels) {
+    super(NodeLabelsStoreEventType.REMOVE_LABELS);
+    this.labels = labels;
+  }
+  
+  public Collection<String> getLabels() {
+    return labels;
+  }
+}

+ 34 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/StoreNewClusterNodeLabels.java

@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels.event;
+
+import java.util.Set;
+
+public class StoreNewClusterNodeLabels extends NodeLabelsStoreEvent {
+  private Set<String> labels;
+  
+  public StoreNewClusterNodeLabels(Set<String> labels) {
+    super(NodeLabelsStoreEventType.ADD_LABELS);
+    this.labels = labels;
+  }
+  
+  public Set<String> getLabels() {
+    return labels;
+  }
+}

+ 37 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/event/UpdateNodeToLabelsMappingsEvent.java

@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels.event;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class UpdateNodeToLabelsMappingsEvent extends NodeLabelsStoreEvent {
+  private Map<NodeId, Set<String>> nodeToLabels;
+
+  public UpdateNodeToLabelsMappingsEvent(Map<NodeId, Set<String>> nodeToLabels) {
+    super(NodeLabelsStoreEventType.STORE_NODE_TO_LABELS);
+    this.nodeToLabels = nodeToLabels;
+  }
+
+  public Map<NodeId, Set<String>> getNodeToLabels() {
+    return nodeToLabels;
+  }
+}

+ 81 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java

@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
+
+public class DummyCommonNodeLabelsManager extends CommonNodeLabelsManager {
+  Map<NodeId, Set<String>> lastNodeToLabels = null;
+  Collection<String> lastAddedlabels = null;
+  Collection<String> lastRemovedlabels = null;
+
+  @Override
+  public void initNodeLabelStore(Configuration conf) {
+    this.store = new NodeLabelsStore(this) {
+
+      @Override
+      public void recover() throws IOException {
+      }
+
+      @Override
+      public void removeClusterNodeLabels(Collection<String> labels)
+          throws IOException {
+        lastRemovedlabels = labels;
+      }
+
+      @Override
+      public void updateNodeToLabelsMappings(
+          Map<NodeId, Set<String>> nodeToLabels) throws IOException {
+        lastNodeToLabels = nodeToLabels;
+      }
+
+      @Override
+      public void storeNewClusterNodeLabels(Set<String> label) throws IOException {
+        lastAddedlabels = label;
+      }
+
+      @Override
+      public void close() throws IOException {
+        // do nothing 
+      }
+    };
+  }
+
+  @Override
+  protected void initDispatcher(Configuration conf) {
+    super.dispatcher = new InlineDispatcher();
+  }
+
+  @Override
+  protected void startDispatcher() {
+    // do nothing
+  }
+  
+  @Override
+  protected void stopDispatcher() {
+    // do nothing
+  }
+}

+ 76 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/NodeLabelTestBase.java

@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.junit.Assert;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+public class NodeLabelTestBase {
+  public static void assertMapEquals(Map<NodeId, Set<String>> m1,
+      ImmutableMap<NodeId, Set<String>> m2) {
+    Assert.assertEquals(m1.size(), m2.size());
+    for (NodeId k : m1.keySet()) {
+      Assert.assertTrue(m2.containsKey(k));
+      assertCollectionEquals(m1.get(k), m2.get(k));
+    }
+  }
+
+  public static void assertMapContains(Map<NodeId, Set<String>> m1,
+      ImmutableMap<NodeId, Set<String>> m2) {
+    for (NodeId k : m2.keySet()) {
+      Assert.assertTrue(m1.containsKey(k));
+      assertCollectionEquals(m1.get(k), m2.get(k));
+    }
+  }
+
+  public static void assertCollectionEquals(Collection<String> c1,
+      Collection<String> c2) {
+    Assert.assertEquals(c1.size(), c2.size());
+    Iterator<String> i1 = c1.iterator();
+    Iterator<String> i2 = c2.iterator();
+    while (i1.hasNext()) {
+      Assert.assertEquals(i1.next(), i2.next());
+    }
+  }
+
+  public static <E> Set<E> toSet(E... elements) {
+    Set<E> set = Sets.newHashSet(elements);
+    return set;
+  }
+  
+  public NodeId toNodeId(String str) {
+    if (str.contains(":")) {
+      int idx = str.indexOf(':');
+      NodeId id =
+          NodeId.newInstance(str.substring(0, idx),
+              Integer.valueOf(str.substring(idx + 1)));
+      return id;
+    } else {
+      return NodeId.newInstance(str, CommonNodeLabelsManager.WILDCARD_PORT);
+    }
+  }
+}

+ 261 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestCommonNodeLabelsManager.java

@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+public class TestCommonNodeLabelsManager extends NodeLabelTestBase {
+  DummyCommonNodeLabelsManager mgr = null;
+
+  @Before
+  public void before() {
+    mgr = new DummyCommonNodeLabelsManager();
+    mgr.init(new Configuration());
+    mgr.start();
+  }
+
+  @After
+  public void after() {
+    mgr.stop();
+  }
+
+  @Test(timeout = 5000)
+  public void testAddRemovelabel() throws Exception {
+    // Add some label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("hello"));
+    assertCollectionEquals(mgr.lastAddedlabels, Arrays.asList("hello"));
+
+    mgr.addToCluserNodeLabels(ImmutableSet.of("world"));
+    mgr.addToCluserNodeLabels(toSet("hello1", "world1"));
+    assertCollectionEquals(mgr.lastAddedlabels,
+        Sets.newHashSet("hello1", "world1"));
+
+    Assert.assertTrue(mgr.getClusterNodeLabels().containsAll(
+        Sets.newHashSet("hello", "world", "hello1", "world1")));
+
+    // try to remove null, empty and non-existed label, should fail
+    for (String p : Arrays.asList(null, CommonNodeLabelsManager.NO_LABEL, "xx")) {
+      boolean caught = false;
+      try {
+        mgr.removeFromClusterNodeLabels(Arrays.asList(p));
+      } catch (IOException e) {
+        caught = true;
+      }
+      Assert.assertTrue("remove label should fail "
+          + "when label is null/empty/non-existed", caught);
+    }
+
+    // Remove some label
+    mgr.removeFromClusterNodeLabels(Arrays.asList("hello"));
+    assertCollectionEquals(mgr.lastRemovedlabels, Arrays.asList("hello"));
+    Assert.assertTrue(mgr.getClusterNodeLabels().containsAll(
+        Arrays.asList("world", "hello1", "world1")));
+
+    mgr.removeFromClusterNodeLabels(Arrays
+        .asList("hello1", "world1", "world"));
+    Assert.assertTrue(mgr.lastRemovedlabels.containsAll(Sets.newHashSet(
+        "hello1", "world1", "world")));
+    Assert.assertTrue(mgr.getClusterNodeLabels().isEmpty());
+  }
+
+  @Test(timeout = 5000)
+  public void testAddlabelWithCase() throws Exception {
+    // Add some label, case will not ignore here
+    mgr.addToCluserNodeLabels(ImmutableSet.of("HeLlO"));
+    assertCollectionEquals(mgr.lastAddedlabels, Arrays.asList("HeLlO"));
+    Assert.assertFalse(mgr.getClusterNodeLabels().containsAll(Arrays.asList("hello")));
+  }
+
+  @Test(timeout = 5000)
+  public void testAddInvalidlabel() throws IOException {
+    boolean caught = false;
+    try {
+      Set<String> set = new HashSet<String>();
+      set.add(null);
+      mgr.addToCluserNodeLabels(set);
+    } catch (IOException e) {
+      caught = true;
+    }
+    Assert.assertTrue("null label should not add to repo", caught);
+
+    caught = false;
+    try {
+      mgr.addToCluserNodeLabels(ImmutableSet.of(CommonNodeLabelsManager.NO_LABEL));
+    } catch (IOException e) {
+      caught = true;
+    }
+
+    Assert.assertTrue("empty label should not add to repo", caught);
+
+    caught = false;
+    try {
+      mgr.addToCluserNodeLabels(ImmutableSet.of("-?"));
+    } catch (IOException e) {
+      caught = true;
+    }
+    Assert.assertTrue("invalid label charactor should not add to repo", caught);
+
+    caught = false;
+    try {
+      mgr.addToCluserNodeLabels(ImmutableSet.of(StringUtils.repeat("c", 257)));
+    } catch (IOException e) {
+      caught = true;
+    }
+    Assert.assertTrue("too long label should not add to repo", caught);
+
+    caught = false;
+    try {
+      mgr.addToCluserNodeLabels(ImmutableSet.of("-aaabbb"));
+    } catch (IOException e) {
+      caught = true;
+    }
+    Assert.assertTrue("label cannot start with \"-\"", caught);
+
+    caught = false;
+    try {
+      mgr.addToCluserNodeLabels(ImmutableSet.of("_aaabbb"));
+    } catch (IOException e) {
+      caught = true;
+    }
+    Assert.assertTrue("label cannot start with \"_\"", caught);
+    
+    caught = false;
+    try {
+      mgr.addToCluserNodeLabels(ImmutableSet.of("a^aabbb"));
+    } catch (IOException e) {
+      caught = true;
+    }
+    Assert.assertTrue("label cannot contains other chars like ^[] ...", caught);
+    
+    caught = false;
+    try {
+      mgr.addToCluserNodeLabels(ImmutableSet.of("aa[a]bbb"));
+    } catch (IOException e) {
+      caught = true;
+    }
+    Assert.assertTrue("label cannot contains other chars like ^[] ...", caught);
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test(timeout = 5000)
+  public void testAddReplaceRemoveLabelsOnNodes() throws Exception {
+    // set a label on a node, but label doesn't exist
+    boolean caught = false;
+    try {
+      mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("node"), toSet("label")));
+    } catch (IOException e) {
+      caught = true;
+    }
+    Assert.assertTrue("trying to set a label to a node but "
+        + "label doesn't exist in repository should fail", caught);
+
+    // set a label on a node, but node is null or empty
+    try {
+      mgr.replaceLabelsOnNode(ImmutableMap.of(
+          toNodeId(CommonNodeLabelsManager.NO_LABEL), toSet("label")));
+    } catch (IOException e) {
+      caught = true;
+    }
+    Assert.assertTrue("trying to add a empty node but succeeded", caught);
+
+    // set node->label one by one
+    mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
+    mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1")));
+    mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p2")));
+    mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n2"), toSet("p3")));
+    assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"),
+        toSet("p2"), toNodeId("n2"), toSet("p3")));
+    assertMapEquals(mgr.lastNodeToLabels,
+        ImmutableMap.of(toNodeId("n2"), toSet("p3")));
+
+    // set bunch of node->label
+    mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n3"), toSet("p3"),
+        toNodeId("n1"), toSet("p1")));
+    assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n1"),
+        toSet("p1"), toNodeId("n2"), toSet("p3"), toNodeId("n3"), toSet("p3")));
+    assertMapEquals(mgr.lastNodeToLabels, ImmutableMap.of(toNodeId("n3"),
+        toSet("p3"), toNodeId("n1"), toSet("p1")));
+
+    /*
+     * n1: p1 
+     * n2: p3 
+     * n3: p3
+     */
+
+    // remove label on node
+    mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("n1"), toSet("p1")));
+    assertMapEquals(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"),
+        toSet("p3"), toNodeId("n3"), toSet("p3")));
+    assertMapEquals(mgr.lastNodeToLabels,
+        ImmutableMap.of(toNodeId("n1"), CommonNodeLabelsManager.EMPTY_STRING_SET));
+
+    // add label on node
+    mgr.addLabelsToNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"),
+        toNodeId("n2"), toSet("p2")));
+    assertMapEquals(
+        mgr.getNodeLabels(),
+        ImmutableMap.of(toNodeId("n1"), toSet("p1"), toNodeId("n2"),
+            toSet("p2", "p3"), toNodeId("n3"), toSet("p3")));
+    assertMapEquals(mgr.lastNodeToLabels,
+        ImmutableMap.of(toNodeId("n1"), toSet("p1"), toNodeId("n2"),
+            toSet("p2", "p3")));
+
+    // remove labels on node
+    mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"),
+        toNodeId("n2"), toSet("p2", "p3"), toNodeId("n3"), toSet("p3")));
+    Assert.assertEquals(0, mgr.getNodeLabels().size());
+    assertMapEquals(mgr.lastNodeToLabels, ImmutableMap.of(toNodeId("n1"),
+        CommonNodeLabelsManager.EMPTY_STRING_SET, toNodeId("n2"),
+        CommonNodeLabelsManager.EMPTY_STRING_SET, toNodeId("n3"),
+        CommonNodeLabelsManager.EMPTY_STRING_SET));
+  }
+
+  @Test(timeout = 5000)
+  public void testRemovelabelWithNodes() throws Exception {
+    mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
+    mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1")));
+    mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n2"), toSet("p2")));
+    mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n3"), toSet("p3")));
+
+    mgr.removeFromClusterNodeLabels(ImmutableSet.of("p1"));
+    assertMapEquals(mgr.getNodeLabels(),
+        ImmutableMap.of(toNodeId("n2"), toSet("p2"), toNodeId("n3"), toSet("p3")));
+    assertCollectionEquals(mgr.lastRemovedlabels, Arrays.asList("p1"));
+
+    mgr.removeFromClusterNodeLabels(ImmutableSet.of("p2", "p3"));
+    Assert.assertTrue(mgr.getNodeLabels().isEmpty());
+    Assert.assertTrue(mgr.getClusterNodeLabels().isEmpty());
+    assertCollectionEquals(mgr.lastRemovedlabels, Arrays.asList("p2", "p3"));
+  }
+}

+ 252 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/TestFileSystemNodeLabelsStore.java

@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.nodelabels;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+public class TestFileSystemNodeLabelsStore extends NodeLabelTestBase {
+  MockNodeLabelManager mgr = null;
+  Configuration conf = null;
+
+  private static class MockNodeLabelManager extends
+      CommonNodeLabelsManager {
+    @Override
+    protected void initDispatcher(Configuration conf) {
+      super.dispatcher = new InlineDispatcher();
+    }
+
+    @Override
+    protected void startDispatcher() {
+      // do nothing
+    }
+    
+    @Override
+    protected void stopDispatcher() {
+      // do nothing
+    }
+  }
+  
+  private FileSystemNodeLabelsStore getStore() {
+    return (FileSystemNodeLabelsStore) mgr.store;
+  }
+
+  @Before
+  public void before() throws IOException {
+    mgr = new MockNodeLabelManager();
+    conf = new Configuration();
+    File tempDir = File.createTempFile("nlb", ".tmp");
+    tempDir.delete();
+    tempDir.mkdirs();
+    tempDir.deleteOnExit();
+    conf.set(YarnConfiguration.FS_NODE_LABELS_STORE_URI,
+        tempDir.getAbsolutePath());
+    mgr.init(conf);
+    mgr.start();
+  }
+
+  @After
+  public void after() throws IOException {
+    getStore().fs.delete(getStore().rootDirPath, true);
+    mgr.stop();
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test(timeout = 10000)
+  public void testRecoverWithMirror() throws Exception {
+    mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
+    mgr.addToCluserNodeLabels(toSet("p4"));
+    mgr.addToCluserNodeLabels(toSet("p5", "p6"));
+    mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n1"), toSet("p1"),
+        toNodeId("n2"), toSet("p2")));
+    mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n3"), toSet("p3"),
+        toNodeId("n4"), toSet("p4"), toNodeId("n5"), toSet("p5"),
+        toNodeId("n6"), toSet("p6"), toNodeId("n7"), toSet("p6")));
+
+    /*
+     * node -> partition p1: n1 p2: n2 p3: n3 p4: n4 p5: n5 p6: n6, n7
+     */
+
+    mgr.removeFromClusterNodeLabels(toSet("p1"));
+    mgr.removeFromClusterNodeLabels(Arrays.asList("p3", "p5"));
+
+    /*
+     * After removed p2: n2 p4: n4 p6: n6, n7
+     */
+    // shutdown mgr and start a new mgr
+    mgr.stop();
+
+    mgr = new MockNodeLabelManager();
+    mgr.init(conf);
+
+    // check variables
+    Assert.assertEquals(3, mgr.getClusterNodeLabels().size());
+    Assert.assertTrue(mgr.getClusterNodeLabels().containsAll(
+        Arrays.asList("p2", "p4", "p6")));
+
+    assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"),
+        toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"),
+        toNodeId("n7"), toSet("p6")));
+
+    // stutdown mgr and start a new mgr
+    mgr.stop();
+    mgr = new MockNodeLabelManager();
+    mgr.init(conf);
+
+    // check variables
+    Assert.assertEquals(3, mgr.getClusterNodeLabels().size());
+    Assert.assertTrue(mgr.getClusterNodeLabels().containsAll(
+        Arrays.asList("p2", "p4", "p6")));
+
+    assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"),
+        toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"),
+        toNodeId("n7"), toSet("p6")));
+    mgr.stop();
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test(timeout = 10000)
+  public void testEditlogRecover() throws Exception {
+    mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
+    mgr.addToCluserNodeLabels(toSet("p4"));
+    mgr.addToCluserNodeLabels(toSet("p5", "p6"));
+    mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"),
+        toNodeId("n2"), toSet("p2")));
+    mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n3"), toSet("p3"),
+        toNodeId("n4"), toSet("p4"), toNodeId("n5"), toSet("p5"),
+        toNodeId("n6"), toSet("p6"), toNodeId("n7"), toSet("p6")));
+
+    /*
+     * node -> partition p1: n1 p2: n2 p3: n3 p4: n4 p5: n5 p6: n6, n7
+     */
+
+    mgr.removeFromClusterNodeLabels(toSet("p1"));
+    mgr.removeFromClusterNodeLabels(Arrays.asList("p3", "p5"));
+
+    /*
+     * After removed p2: n2 p4: n4 p6: n6, n7
+     */
+    // shutdown mgr and start a new mgr
+    mgr.stop();
+
+    mgr = new MockNodeLabelManager();
+    mgr.init(conf);
+
+    // check variables
+    Assert.assertEquals(3, mgr.getClusterNodeLabels().size());
+    Assert.assertTrue(mgr.getClusterNodeLabels().containsAll(
+        Arrays.asList("p2", "p4", "p6")));
+
+    assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"),
+        toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"),
+        toNodeId("n7"), toSet("p6")));
+    mgr.stop();
+  }
+  
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test//(timeout = 10000)
+  public void testSerilizationAfterRecovery() throws Exception {
+    mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
+    mgr.addToCluserNodeLabels(toSet("p4"));
+    mgr.addToCluserNodeLabels(toSet("p5", "p6"));
+    mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"),
+        toNodeId("n2"), toSet("p2")));
+    mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n3"), toSet("p3"),
+        toNodeId("n4"), toSet("p4"), toNodeId("n5"), toSet("p5"),
+        toNodeId("n6"), toSet("p6"), toNodeId("n7"), toSet("p6")));
+
+    /*
+     * node -> labels 
+     * p1: n1 
+     * p2: n2 
+     * p3: n3
+     * p4: n4 
+     * p5: n5 
+     * p6: n6, n7
+     */
+
+    mgr.removeFromClusterNodeLabels(toSet("p1"));
+    mgr.removeFromClusterNodeLabels(Arrays.asList("p3", "p5"));
+
+    /*
+     * After removed 
+     * p2: n2 
+     * p4: n4 
+     * p6: n6, n7
+     */
+    // shutdown mgr and start a new mgr
+    mgr.stop();
+
+    mgr = new MockNodeLabelManager();
+    mgr.init(conf);
+    mgr.start();
+
+    // check variables
+    Assert.assertEquals(3, mgr.getClusterNodeLabels().size());
+    Assert.assertTrue(mgr.getClusterNodeLabels().containsAll(
+        Arrays.asList("p2", "p4", "p6")));
+
+    assertMapContains(mgr.getNodeLabels(), ImmutableMap.of(toNodeId("n2"),
+        toSet("p2"), toNodeId("n4"), toSet("p4"), toNodeId("n6"), toSet("p6"),
+        toNodeId("n7"), toSet("p6")));
+    
+    /*
+     * Add label p7,p8 then shutdown
+     */
+    mgr = new MockNodeLabelManager();
+    mgr.init(conf);
+    mgr.start();
+    mgr.addToCluserNodeLabels(toSet("p7", "p8"));
+    mgr.stop();
+    
+    /*
+     * Restart, add label p9 and shutdown
+     */
+    mgr = new MockNodeLabelManager();
+    mgr.init(conf);
+    mgr.start();
+    mgr.addToCluserNodeLabels(toSet("p9"));
+    mgr.stop();
+    
+    /*
+     * Recovery, and see if p9 added
+     */
+    mgr = new MockNodeLabelManager();
+    mgr.init(conf);
+    mgr.start();
+
+    // check variables
+    Assert.assertEquals(6, mgr.getClusterNodeLabels().size());
+    Assert.assertTrue(mgr.getClusterNodeLabels().containsAll(
+        Arrays.asList("p2", "p4", "p6", "p7", "p8", "p9")));
+    mgr.stop();
+  }
+}

+ 447 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java

@@ -0,0 +1,447 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.collect.ImmutableSet;
+
+public class RMNodeLabelsManager extends CommonNodeLabelsManager {
+  
+  protected static class Queue {
+    protected Set<String> acccessibleNodeLabels;
+    protected Resource resource;
+
+    protected Queue() {
+      acccessibleNodeLabels =
+          Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+      resource = Resource.newInstance(0, 0);
+    }
+  }
+
+  ConcurrentMap<String, Queue> queueCollections =
+      new ConcurrentHashMap<String, Queue>();
+  protected AccessControlList adminAcl;
+  
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    adminAcl =
+        new AccessControlList(conf.get(YarnConfiguration.YARN_ADMIN_ACL,
+            YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
+  }
+
+  @Override
+  public void addLabelsToNode(Map<NodeId, Set<String>> addedLabelsToNode)
+      throws IOException {    
+    try {
+      writeLock.lock();
+
+      // get nodesCollection before edition
+      Map<String, Host> before = cloneNodeMap(addedLabelsToNode.keySet());
+
+      super.addLabelsToNode(addedLabelsToNode);
+
+      // get nodesCollection after edition
+      Map<String, Host> after = cloneNodeMap(addedLabelsToNode.keySet());
+
+      // update running nodes resources
+      updateResourceMappings(before, after);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  protected void checkRemoveFromClusterNodeLabelsOfQueue(
+      Collection<String> labelsToRemove) throws IOException {
+    // Check if label to remove doesn't existed or null/empty, will throw
+    // exception if any of labels to remove doesn't meet requirement
+    for (String label : labelsToRemove) {
+      label = normalizeLabel(label);
+
+      // check if any queue contains this label
+      for (Entry<String, Queue> entry : queueCollections.entrySet()) {
+        String queueName = entry.getKey();
+        Set<String> queueLabels = entry.getValue().acccessibleNodeLabels;
+        if (queueLabels.contains(label)) {
+          throw new IOException("Cannot remove label=" + label
+              + ", because queue=" + queueName + " is using this label. "
+              + "Please remove label on queue before remove the label");
+        }
+      }
+    }
+  }
+
+  @Override
+  public void removeFromClusterNodeLabels(Collection<String> labelsToRemove)
+      throws IOException {
+    try {
+      writeLock.lock();
+
+      checkRemoveFromClusterNodeLabelsOfQueue(labelsToRemove);
+
+      // copy before NMs
+      Map<String, Host> before = cloneNodeMap();
+
+      super.removeFromClusterNodeLabels(labelsToRemove);
+
+      updateResourceMappings(before, nodeCollections);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public void
+      removeLabelsFromNode(Map<NodeId, Set<String>> removeLabelsFromNode)
+          throws IOException {
+    try {
+      writeLock.lock();
+
+      // get nodesCollection before edition
+      Map<String, Host> before =
+          cloneNodeMap(removeLabelsFromNode.keySet());
+
+      super.removeLabelsFromNode(removeLabelsFromNode);
+
+      // get nodesCollection before edition
+      Map<String, Host> after = cloneNodeMap(removeLabelsFromNode.keySet());
+
+      // update running nodes resources
+      updateResourceMappings(before, after);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public void replaceLabelsOnNode(Map<NodeId, Set<String>> replaceLabelsToNode)
+      throws IOException {
+    try {
+      writeLock.lock();
+      
+      // get nodesCollection before edition
+      Map<String, Host> before = cloneNodeMap(replaceLabelsToNode.keySet());
+
+      super.replaceLabelsOnNode(replaceLabelsToNode);
+
+      // get nodesCollection after edition
+      Map<String, Host> after = cloneNodeMap(replaceLabelsToNode.keySet());
+
+      // update running nodes resources
+      updateResourceMappings(before, after);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+  
+
+  /*
+   * Following methods are used for setting if a node is up and running, and it
+   * will update running nodes resource
+   */
+  public void activateNode(NodeId nodeId, Resource resource) {
+    try {
+      writeLock.lock();
+      
+      // save if we have a node before
+      Map<String, Host> before = cloneNodeMap(ImmutableSet.of(nodeId));
+      
+      createNodeIfNonExisted(nodeId);
+      Node nm = getNMInNodeSet(nodeId);
+      nm.resource = resource;
+      nm.running = true;
+      
+      // get the node after edition
+      Map<String, Host> after = cloneNodeMap(ImmutableSet.of(nodeId));
+      
+      updateResourceMappings(before, after);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+  
+  /*
+   * Following methods are used for setting if a node unregistered to RM
+   */
+  public void deactivateNode(NodeId nodeId) {
+    try {
+      writeLock.lock();
+      
+      // save if we have a node before
+      Map<String, Host> before = cloneNodeMap(ImmutableSet.of(nodeId));
+      Node nm = getNMInNodeSet(nodeId);
+      if (null != nm) {
+        // set nm is not running, and its resource = 0
+        nm.running = false;
+        nm.resource = Resource.newInstance(0, 0);
+      }
+      
+      // get the node after edition
+      Map<String, Host> after = cloneNodeMap(ImmutableSet.of(nodeId));
+      
+      updateResourceMappings(before, after);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public void updateNodeResource(NodeId node, Resource newResource) {
+    deactivateNode(node);
+    activateNode(node, newResource);
+  }
+
+  public void reinitializeQueueLabels(Map<String, Set<String>> queueToLabels) {
+    try {
+      writeLock.lock();
+      // clear before set
+      this.queueCollections.clear();
+
+      for (Entry<String, Set<String>> entry : queueToLabels.entrySet()) {
+        String queue = entry.getKey();
+        Queue q = new Queue();
+        this.queueCollections.put(queue, q);
+
+        Set<String> labels = entry.getValue();
+        if (labels.contains(ANY)) {
+          continue;
+        }
+
+        q.acccessibleNodeLabels.addAll(labels);
+        for (Host host : nodeCollections.values()) {
+          for (Entry<NodeId, Node> nentry : host.nms.entrySet()) {
+            NodeId nodeId = nentry.getKey();
+            Node nm = nentry.getValue();
+            if (nm.running && isNodeUsableByQueue(getLabelsByNode(nodeId), q)) {
+              Resources.addTo(q.resource, nm.resource);
+            }
+          }
+        }
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+  
+  public Resource getQueueResource(String queueName, Set<String> queueLabels,
+      Resource clusterResource) {
+    try {
+      readLock.lock();
+      if (queueLabels.contains(ANY)) {
+        return clusterResource;
+      }
+      Queue q = queueCollections.get(queueName);
+      if (null == q) {
+        return Resources.none();
+      }
+      return q.resource;
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
+  public Set<String> getLabelsOnNode(NodeId nodeId) {
+    try {
+      readLock.lock();
+      Set<String> nodeLabels = getLabelsByNode(nodeId);
+      return Collections.unmodifiableSet(nodeLabels);
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
+  public boolean containsNodeLabel(String label) {
+    try {
+      readLock.lock();
+      return label != null
+          && (label.isEmpty() || labelCollections.containsKey(label));
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private Map<String, Host> cloneNodeMap(Set<NodeId> nodesToCopy) {
+    Map<String, Host> map = new HashMap<String, Host>();
+    for (NodeId nodeId : nodesToCopy) {
+      if (!map.containsKey(nodeId.getHost())) {
+        Host originalN = nodeCollections.get(nodeId.getHost());
+        if (null == originalN) {
+          continue;
+        }
+        Host n = originalN.copy();
+        n.nms.clear();
+        map.put(nodeId.getHost(), n);
+      }
+
+      Host n = map.get(nodeId.getHost());
+      if (WILDCARD_PORT == nodeId.getPort()) {
+        for (Entry<NodeId, Node> entry : nodeCollections
+            .get(nodeId.getHost()).nms.entrySet()) {
+          n.nms.put(entry.getKey(), entry.getValue().copy());
+        }
+      } else {
+        Node nm = getNMInNodeSet(nodeId);
+        if (null != nm) {
+          n.nms.put(nodeId, nm.copy());
+        }
+      }
+    }
+    return map;
+  }
+
+  private void updateResourceMappings(Map<String, Host> before,
+      Map<String, Host> after) {
+    // Get NMs in before only
+    Set<NodeId> allNMs = new HashSet<NodeId>();
+    for (Entry<String, Host> entry : before.entrySet()) {
+      allNMs.addAll(entry.getValue().nms.keySet());
+    }
+    for (Entry<String, Host> entry : after.entrySet()) {
+      allNMs.addAll(entry.getValue().nms.keySet());
+    }
+
+    // traverse all nms
+    for (NodeId nodeId : allNMs) {
+      Node oldNM;
+      if ((oldNM = getNMInNodeSet(nodeId, before, true)) != null) {
+        Set<String> oldLabels = getLabelsByNode(nodeId, before);
+        // no label in the past
+        if (oldLabels.isEmpty()) {
+          // update labels
+          Label label = labelCollections.get(NO_LABEL);
+          Resources.subtractFrom(label.resource, oldNM.resource);
+
+          // update queues, all queue can access this node
+          for (Queue q : queueCollections.values()) {
+            Resources.subtractFrom(q.resource, oldNM.resource);
+          }
+        } else {
+          // update labels
+          for (String labelName : oldLabels) {
+            Label label = labelCollections.get(labelName);
+            if (null == label) {
+              continue;
+            }
+            Resources.subtractFrom(label.resource, oldNM.resource);
+          }
+
+          // update queues, only queue can access this node will be subtract
+          for (Queue q : queueCollections.values()) {
+            if (isNodeUsableByQueue(oldLabels, q)) {
+              Resources.subtractFrom(q.resource, oldNM.resource);
+            }
+          }
+        }
+      }
+
+      Node newNM;
+      if ((newNM = getNMInNodeSet(nodeId, after, true)) != null) {
+        Set<String> newLabels = getLabelsByNode(nodeId, after);
+        // no label in the past
+        if (newLabels.isEmpty()) {
+          // update labels
+          Label label = labelCollections.get(NO_LABEL);
+          Resources.addTo(label.resource, newNM.resource);
+
+          // update queues, all queue can access this node
+          for (Queue q : queueCollections.values()) {
+            Resources.addTo(q.resource, newNM.resource);
+          }
+        } else {
+          // update labels
+          for (String labelName : newLabels) {
+            Label label = labelCollections.get(labelName);
+            Resources.addTo(label.resource, newNM.resource);
+          }
+
+          // update queues, only queue can access this node will be subtract
+          for (Queue q : queueCollections.values()) {
+            if (isNodeUsableByQueue(newLabels, q)) {
+              Resources.addTo(q.resource, newNM.resource);
+            }
+          }
+        }
+      }
+    }
+  }
+  
+  public Resource getResourceByLabel(String label, Resource clusterResource) {
+    label = normalizeLabel(label);
+    try {
+      readLock.lock();
+      if (null == labelCollections.get(label)) {
+        return Resources.none();
+      }
+      return labelCollections.get(label).resource;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private boolean isNodeUsableByQueue(Set<String> nodeLabels, Queue q) {
+    // node without any labels can be accessed by any queue
+    if (nodeLabels == null || nodeLabels.isEmpty()
+        || (nodeLabels.size() == 1 && nodeLabels.contains(NO_LABEL))) {
+      return true;
+    }
+
+    for (String label : nodeLabels) {
+      if (q.acccessibleNodeLabels.contains(label)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  private Map<String, Host> cloneNodeMap() {
+    Set<NodeId> nodesToCopy = new HashSet<NodeId>();
+    for (String nodeName : nodeCollections.keySet()) {
+      nodesToCopy.add(NodeId.newInstance(nodeName, WILDCARD_PORT));
+    }
+    return cloneNodeMap(nodesToCopy);
+  }
+
+  public boolean checkAccess(UserGroupInformation user) {
+    // make sure only admin can invoke
+    // this method
+    if (adminAcl.isUserAllowed(user)) {
+      return true;
+    }
+    return false;
+  }
+}

+ 83 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/DummyRMNodeLabelsManager.java

@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
+import org.apache.hadoop.yarn.nodelabels.NodeLabelsStore;
+
+public class DummyRMNodeLabelsManager extends RMNodeLabelsManager {
+  Map<NodeId, Set<String>> lastNodeToLabels = null;
+  Collection<String> lastAddedlabels = null;
+  Collection<String> lastRemovedlabels = null;
+
+  @Override
+  public void initNodeLabelStore(Configuration conf) {
+    this.store = new NodeLabelsStore(this) {
+
+      @Override
+      public void recover() throws IOException {
+        // do nothing
+      }
+
+      @Override
+      public void removeClusterNodeLabels(Collection<String> labels)
+          throws IOException {
+        // do nothing
+      }
+
+      @Override
+      public void updateNodeToLabelsMappings(
+          Map<NodeId, Set<String>> nodeToLabels) throws IOException {
+        // do nothing
+      }
+
+      @Override
+      public void storeNewClusterNodeLabels(Set<String> label) throws IOException {
+        // do nothing
+      }
+
+      @Override
+      public void close() throws IOException {
+        // do nothing
+      }
+    };
+  }
+
+  @Override
+  protected void initDispatcher(Configuration conf) {
+    super.dispatcher = new InlineDispatcher();
+  }
+
+  @Override
+  protected void startDispatcher() {
+    // do nothing
+  }
+  
+  @Override
+  protected void stopDispatcher() {
+    // do nothing
+  }
+}

+ 367 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java

@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.nodelabels;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class TestRMNodeLabelsManager extends NodeLabelTestBase {
+  private final Resource EMPTY_RESOURCE = Resource.newInstance(0, 0);
+  private final Resource SMALL_RESOURCE = Resource.newInstance(100, 0);
+  private final Resource LARGE_NODE = Resource.newInstance(1000, 0);
+  
+  DummyRMNodeLabelsManager mgr = null;
+
+  @Before
+  public void before() {
+    mgr = new DummyRMNodeLabelsManager();
+    mgr.init(new Configuration());
+    mgr.start();
+  }
+
+  @After
+  public void after() {
+    mgr.stop();
+  }
+  
+  @Test(timeout = 5000)
+  public void testNodeActiveDeactiveUpdate() throws Exception {
+    mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
+    mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"),
+        toNodeId("n2"), toSet("p2"), toNodeId("n3"), toSet("p3")));
+
+    Assert.assertEquals(mgr.getResourceByLabel("p1", null), EMPTY_RESOURCE);
+    Assert.assertEquals(mgr.getResourceByLabel("p2", null), EMPTY_RESOURCE);
+    Assert.assertEquals(mgr.getResourceByLabel("p3", null), EMPTY_RESOURCE);
+    Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null),
+        EMPTY_RESOURCE);
+
+    // active two NM to n1, one large and one small
+    mgr.activateNode(NodeId.newInstance("n1", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("n1", 2), LARGE_NODE);
+    Assert.assertEquals(mgr.getResourceByLabel("p1", null),
+        Resources.add(SMALL_RESOURCE, LARGE_NODE));
+
+    // change the large NM to small, check if resource updated
+    mgr.updateNodeResource(NodeId.newInstance("n1", 2), SMALL_RESOURCE);
+    Assert.assertEquals(mgr.getResourceByLabel("p1", null),
+        Resources.multiply(SMALL_RESOURCE, 2));
+
+    // deactive one NM, and check if resource updated
+    mgr.deactivateNode(NodeId.newInstance("n1", 1));
+    Assert.assertEquals(mgr.getResourceByLabel("p1", null), SMALL_RESOURCE);
+
+    // continus deactive, check if resource updated
+    mgr.deactivateNode(NodeId.newInstance("n1", 2));
+    Assert.assertEquals(mgr.getResourceByLabel("p1", null), EMPTY_RESOURCE);
+
+    // Add two NM to n1 back
+    mgr.activateNode(NodeId.newInstance("n1", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("n1", 2), LARGE_NODE);
+
+    // And remove p1, now the two NM should come to default label,
+    mgr.removeFromClusterNodeLabels(ImmutableSet.of("p1"));
+    Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null),
+        Resources.add(SMALL_RESOURCE, LARGE_NODE));
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test(timeout = 5000)
+  public void testUpdateNodeLabelWithActiveNode() throws Exception {
+    mgr.addToCluserNodeLabels(toSet("p1", "p2", "p3"));
+    mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p1"),
+        toNodeId("n2"), toSet("p2"), toNodeId("n3"), toSet("p3")));
+
+    // active two NM to n1, one large and one small
+    mgr.activateNode(NodeId.newInstance("n1", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("n2", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("n3", 1), SMALL_RESOURCE);
+
+    // change label of n1 to p2
+    mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("n1"), toSet("p2")));
+    Assert.assertEquals(mgr.getResourceByLabel("p1", null), EMPTY_RESOURCE);
+    Assert.assertEquals(mgr.getResourceByLabel("p2", null),
+        Resources.multiply(SMALL_RESOURCE, 2));
+    Assert.assertEquals(mgr.getResourceByLabel("p3", null), SMALL_RESOURCE);
+
+    // add more labels
+    mgr.addToCluserNodeLabels(toSet("p4", "p5", "p6"));
+    mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n4"), toSet("p1"),
+        toNodeId("n5"), toSet("p2"), toNodeId("n6"), toSet("p3"),
+        toNodeId("n7"), toSet("p4"), toNodeId("n8"), toSet("p5")));
+
+    // now node -> label is,
+    // p1 : n4
+    // p2 : n1, n2, n5
+    // p3 : n3, n6
+    // p4 : n7
+    // p5 : n8
+    // no-label : n9
+
+    // active these nodes
+    mgr.activateNode(NodeId.newInstance("n4", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("n5", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("n6", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("n7", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("n8", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("n9", 1), SMALL_RESOURCE);
+
+    // check varibles
+    Assert.assertEquals(mgr.getResourceByLabel("p1", null), SMALL_RESOURCE);
+    Assert.assertEquals(mgr.getResourceByLabel("p2", null),
+        Resources.multiply(SMALL_RESOURCE, 3));
+    Assert.assertEquals(mgr.getResourceByLabel("p3", null),
+        Resources.multiply(SMALL_RESOURCE, 2));
+    Assert.assertEquals(mgr.getResourceByLabel("p4", null),
+        Resources.multiply(SMALL_RESOURCE, 1));
+    Assert.assertEquals(mgr.getResourceByLabel("p5", null),
+        Resources.multiply(SMALL_RESOURCE, 1));
+    Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null),
+        Resources.multiply(SMALL_RESOURCE, 1));
+
+    // change a bunch of nodes -> labels
+    // n4 -> p2
+    // n7 -> empty
+    // n5 -> p1
+    // n8 -> empty
+    // n9 -> p1
+    //
+    // now become:
+    // p1 : n5, n9
+    // p2 : n1, n2, n4
+    // p3 : n3, n6
+    // p4 : [ ]
+    // p5 : [ ]
+    // no label: n8, n7
+    mgr.replaceLabelsOnNode((Map) ImmutableMap.of(toNodeId("n4"), toSet("p2"),
+        toNodeId("n7"), RMNodeLabelsManager.EMPTY_STRING_SET, toNodeId("n5"),
+        toSet("p1"), toNodeId("n8"), RMNodeLabelsManager.EMPTY_STRING_SET,
+        toNodeId("n9"), toSet("p1")));
+
+    // check varibles
+    Assert.assertEquals(mgr.getResourceByLabel("p1", null),
+        Resources.multiply(SMALL_RESOURCE, 2));
+    Assert.assertEquals(mgr.getResourceByLabel("p2", null),
+        Resources.multiply(SMALL_RESOURCE, 3));
+    Assert.assertEquals(mgr.getResourceByLabel("p3", null),
+        Resources.multiply(SMALL_RESOURCE, 2));
+    Assert.assertEquals(mgr.getResourceByLabel("p4", null),
+        Resources.multiply(SMALL_RESOURCE, 0));
+    Assert.assertEquals(mgr.getResourceByLabel("p5", null),
+        Resources.multiply(SMALL_RESOURCE, 0));
+    Assert.assertEquals(mgr.getResourceByLabel("", null),
+        Resources.multiply(SMALL_RESOURCE, 2));
+  }
+  
+  @Test(timeout=5000)
+  public void testGetQueueResource() throws Exception {
+    Resource clusterResource = Resource.newInstance(9999, 1);
+    
+    /*
+     * Node->Labels:
+     *   host1 : red, blue
+     *   host2 : blue, yellow
+     *   host3 : yellow
+     *   host4 :
+     */
+    mgr.addToCluserNodeLabels(toSet("red", "blue", "yellow"));
+    mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("host1"),
+        toSet("red", "blue")));
+    mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("host2"),
+        toSet("blue", "yellow")));
+    mgr.replaceLabelsOnNode(ImmutableMap.of(toNodeId("host3"), toSet("yellow")));
+    
+    // active two NM to n1, one large and one small
+    mgr.activateNode(NodeId.newInstance("host1", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("host2", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("host3", 1), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("host4", 1), SMALL_RESOURCE);
+    
+    // reinitialize queue
+    Set<String> q1Label = toSet("red", "blue");
+    Set<String> q2Label = toSet("blue", "yellow");
+    Set<String> q3Label = toSet("yellow");
+    Set<String> q4Label = RMNodeLabelsManager.EMPTY_STRING_SET;
+    Set<String> q5Label = toSet(RMNodeLabelsManager.ANY);
+    
+    Map<String, Set<String>> queueToLabels = new HashMap<String, Set<String>>();
+    queueToLabels.put("Q1", q1Label);
+    queueToLabels.put("Q2", q2Label);
+    queueToLabels.put("Q3", q3Label);
+    queueToLabels.put("Q4", q4Label);
+    queueToLabels.put("Q5", q5Label);
+
+    mgr.reinitializeQueueLabels(queueToLabels);
+    
+    // check resource
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q1", q1Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 4),
+        mgr.getQueueResource("Q2", q2Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q3", q3Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1),
+        mgr.getQueueResource("Q4", q4Label, clusterResource));
+    Assert.assertEquals(clusterResource,
+        mgr.getQueueResource("Q5", q5Label, clusterResource));
+    
+    mgr.removeLabelsFromNode(ImmutableMap.of(toNodeId("host1"), toSet("red"),
+        toNodeId("host2"), toSet("blue", "yellow")));
+    mgr.addLabelsToNode(ImmutableMap.of(toNodeId("host3"), toSet("red")));
+    /*
+     * Check resource after changes some labels
+     * Node->Labels:
+     *   host1 : blue (was: red, blue)
+     *   host2 : (was: blue, yellow)
+     *   host3 : red, yellow (was: yellow)
+     *   host4 :
+     */
+    
+    // check resource
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 4),
+        mgr.getQueueResource("Q1", q1Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 4),
+        mgr.getQueueResource("Q2", q2Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q3", q3Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2),
+        mgr.getQueueResource("Q4", q4Label, clusterResource));
+    Assert.assertEquals(clusterResource,
+        mgr.getQueueResource("Q5", q5Label, clusterResource));
+    
+    /*
+     * Check resource after deactive/active some nodes 
+     * Node->Labels:
+     *   (deactived) host1 : blue
+     *   host2 :
+     *   (deactived and then actived) host3 : red, yellow
+     *   host4 :
+     */
+    mgr.deactivateNode(NodeId.newInstance("host1", 1));
+    mgr.deactivateNode(NodeId.newInstance("host3", 1));
+    mgr.activateNode(NodeId.newInstance("host3", 1), SMALL_RESOURCE);
+    
+    // check resource
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q1", q1Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q2", q2Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q3", q3Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2),
+        mgr.getQueueResource("Q4", q4Label, clusterResource));
+    Assert.assertEquals(clusterResource,
+        mgr.getQueueResource("Q5", q5Label, clusterResource));
+    
+    /*
+     * Check resource after refresh queue:
+     *    Q1: blue
+     *    Q2: red, blue
+     *    Q3: red
+     *    Q4:
+     *    Q5: ANY
+     */
+    q1Label = toSet("blue");
+    q2Label = toSet("blue", "red");
+    q3Label = toSet("red");
+    q4Label = RMNodeLabelsManager.EMPTY_STRING_SET;
+    q5Label = toSet(RMNodeLabelsManager.ANY);
+    
+    queueToLabels.clear();
+    queueToLabels.put("Q1", q1Label);
+    queueToLabels.put("Q2", q2Label);
+    queueToLabels.put("Q3", q3Label);
+    queueToLabels.put("Q4", q4Label);
+    queueToLabels.put("Q5", q5Label);
+
+    mgr.reinitializeQueueLabels(queueToLabels);
+    
+    // check resource
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2),
+        mgr.getQueueResource("Q1", q1Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q2", q2Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q3", q3Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 2),
+        mgr.getQueueResource("Q4", q4Label, clusterResource));
+    Assert.assertEquals(clusterResource,
+        mgr.getQueueResource("Q5", q5Label, clusterResource));
+    
+    /*
+     * Active NMs in nodes already have NM
+     * Node->Labels:
+     *   host2 :
+     *   host3 : red, yellow (3 NMs)
+     *   host4 : (2 NMs)
+     */
+    mgr.activateNode(NodeId.newInstance("host3", 2), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("host3", 3), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("host4", 2), SMALL_RESOURCE);
+    
+    // check resource
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q1", q1Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 6),
+        mgr.getQueueResource("Q2", q2Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 6),
+        mgr.getQueueResource("Q3", q3Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q4", q4Label, clusterResource));
+    Assert.assertEquals(clusterResource,
+        mgr.getQueueResource("Q5", q5Label, clusterResource));
+    
+    /*
+     * Deactive NMs in nodes already have NMs
+     * Node->Labels:
+     *   host2 :
+     *   host3 : red, yellow (2 NMs)
+     *   host4 : (0 NMs)
+     */
+    mgr.deactivateNode(NodeId.newInstance("host3", 3));
+    mgr.deactivateNode(NodeId.newInstance("host4", 2));
+    mgr.deactivateNode(NodeId.newInstance("host4", 1));
+    
+    // check resource
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1),
+        mgr.getQueueResource("Q1", q1Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q2", q2Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 3),
+        mgr.getQueueResource("Q3", q3Label, clusterResource));
+    Assert.assertEquals(Resources.multiply(SMALL_RESOURCE, 1),
+        mgr.getQueueResource("Q4", q4Label, clusterResource));
+    Assert.assertEquals(clusterResource,
+        mgr.getQueueResource("Q5", q5Label, clusterResource));
+  }
+}