Explorar o código

YARN-2504. Enhanced RM Admin CLI to support management of node-labels. Contribyted by Wangda Tan.

(cherry picked from commit 82567664988b673f1b819a42a4baf31cb0dcb331)
Vinod Kumar Vavilapalli %!s(int64=10) %!d(string=hai) anos
pai
achega
9915d52185

+ 4 - 1
hadoop-yarn-project/CHANGES.txt

@@ -145,9 +145,12 @@ Release 2.6.0 - UNRELEASED
     YARN-2496. Enhanced Capacity Scheduler to have basic support for allocating
     resources based on node-labels. (Wangda Tan via vinodkv)
 
-    YARN-2500. Ehnaced ResourceManager to support schedulers allocating resources
+    YARN-2500. Enhaced ResourceManager to support schedulers allocating resources
     based on node-labels. (Wangda Tan via vinodkv)
 
+    YARN-2504. Enhanced RM Admin CLI to support management of node-labels.
+    (Wangda Tan via vinodkv)
+
   IMPROVEMENTS
 
     YARN-2242. Improve exception information on AM launch crashes. (Li Lu 

+ 40 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java

@@ -30,6 +30,12 @@ import org.apache.hadoop.tools.GetUserMappingsProtocol;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
@@ -42,6 +48,10 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsC
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
 
@@ -110,4 +120,34 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr
   public UpdateNodeResourceResponse updateNodeResource(
       UpdateNodeResourceRequest request) 
   throws YarnException, IOException;
+   
+  @Public
+  @Evolving
+  @Idempotent
+  public AddToClusterNodeLabelsResponse addToClusterNodeLabels(AddToClusterNodeLabelsRequest request)
+      throws YarnException, IOException;
+   
+  @Public
+  @Evolving
+  @Idempotent
+  public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
+      RemoveFromClusterNodeLabelsRequest request) throws YarnException, IOException;
+  
+  @Public
+  @Evolving
+  @Idempotent
+  public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
+      ReplaceLabelsOnNodeRequest request) throws YarnException, IOException;
+  
+  @Public
+  @Evolving
+  @Idempotent
+  public GetNodesToLabelsResponse getNodeToLabels(
+      GetNodesToLabelsRequest request) throws YarnException, IOException;
+  
+  @Public
+  @Evolving
+  @Idempotent
+  public GetClusterNodeLabelsResponse getClusterNodeLabels(
+      GetClusterNodeLabelsRequest request) throws YarnException, IOException;
 }

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/resourcemanager_administration_protocol.proto

@@ -39,4 +39,9 @@ service ResourceManagerAdministrationProtocolService {
   rpc refreshServiceAcls(RefreshServiceAclsRequestProto) returns (RefreshServiceAclsResponseProto);
   rpc getGroupsForUser(GetGroupsForUserRequestProto) returns (GetGroupsForUserResponseProto);
   rpc updateNodeResource (UpdateNodeResourceRequestProto) returns (UpdateNodeResourceResponseProto);
+  rpc addToClusterNodeLabels(AddToClusterNodeLabelsRequestProto) returns (AddToClusterNodeLabelsResponseProto);
+  rpc removeFromClusterNodeLabels(RemoveFromClusterNodeLabelsRequestProto) returns (RemoveFromClusterNodeLabelsResponseProto);
+  rpc replaceLabelsOnNodes(ReplaceLabelsOnNodeRequestProto) returns (ReplaceLabelsOnNodeResponseProto);
+  rpc getNodeToLabels(GetNodesToLabelsRequestProto) returns (GetNodesToLabelsResponseProto);
+  rpc getClusterNodeLabels(GetClusterNodeLabelsRequestProto) returns (GetClusterNodeLabelsResponseProto);
 }

+ 254 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java

@@ -19,11 +19,17 @@
 package org.apache.hadoop.yarn.client.cli;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
-import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
@@ -33,6 +39,7 @@ import org.apache.hadoop.ha.HAServiceTarget;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.client.RMHAServiceTarget;
 import org.apache.hadoop.yarn.conf.HAUtil;
@@ -41,13 +48,21 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
+
+import com.google.common.collect.ImmutableMap;
 
 @Private
 @Unstable
@@ -55,6 +70,8 @@ public class RMAdminCLI extends HAAdmin {
 
   private final RecordFactory recordFactory = 
     RecordFactoryProvider.getRecordFactory(null);
+  private boolean directlyAccessNodeLabelStore = false;
+  static CommonNodeLabelsManager localNodeLabelsManager = null;
 
   protected final static Map<String, UsageInfo> ADMIN_USAGE =
       ImmutableMap.<String, UsageInfo>builder()
@@ -78,7 +95,30 @@ public class RMAdminCLI extends HAAdmin {
           .put("-help", new UsageInfo("[cmd]",
               "Displays help for the given command or all commands if none " +
                   "is specified."))
-          .build();
+          .put("-addToClusterNodeLabels",
+              new UsageInfo("[label1,label2,label3] (label splitted by \",\")",
+                  "add to cluster node labels "))
+          .put("-removeFromClusterNodeLabels",
+              new UsageInfo("[label1,label2,label3] (label splitted by \",\")",
+                  "remove from cluster node labels"))
+          .put("-replaceLabelsOnNode",
+              new UsageInfo("[node1:port,label1,label2 node2:port,label1,label2]",
+                  "replace labels on nodes"))
+          .put("-getNodeToLabels", new UsageInfo("", 
+              "Get node to label mappings"))
+          .put("-getClusterNodeLabels",
+              new UsageInfo("", "Get node labels in the cluster"))
+          .put("-directlyAccessNodeLabelStore",
+              new UsageInfo("", "Directly access node label store, "
+                  + "with this option, all node label related operations"
+                  + " will not connect RM. Instead, they will"
+                  + " access/modify stored node labels directly."
+                  + " By default, it is false (access via RM)."
+                  + " AND PLEASE NOTE: if you configured"
+                  + " yarn.node-labels.fs-store.uri to a local directory"
+                  + " (instead of NFS or HDFS), this option will only work"
+                  + " when the command run on the machine where RM is running."))              
+              .build();
 
   public RMAdminCLI() {
     super();
@@ -201,11 +241,13 @@ public class RMAdminCLI extends HAAdmin {
     ToolRunner.printGenericCommandUsage(System.err);
 
   }
-
-  protected ResourceManagerAdministrationProtocol createAdminProtocol() throws IOException {
+  
+  protected ResourceManagerAdministrationProtocol createAdminProtocol()
+      throws IOException {
     // Get the current configuration
     final YarnConfiguration conf = new YarnConfiguration(getConf());
-    return ClientRMProxy.createRMProxy(conf, ResourceManagerAdministrationProtocol.class);
+    return ClientRMProxy.createRMProxy(conf,
+        ResourceManagerAdministrationProtocol.class);
   }
   
   private int refreshQueues() throws IOException, YarnException {
@@ -285,8 +327,187 @@ public class RMAdminCLI extends HAAdmin {
     return 0;
   }
   
+  // Make it protected to make unit test can change it.
+  protected static synchronized CommonNodeLabelsManager
+      getNodeLabelManagerInstance(Configuration conf) {
+    if (localNodeLabelsManager == null) {
+      localNodeLabelsManager = new CommonNodeLabelsManager();
+      localNodeLabelsManager.init(conf);
+      localNodeLabelsManager.start();
+    }
+    return localNodeLabelsManager;
+  }
+  
+  private int addToClusterNodeLabels(String args) throws IOException,
+      YarnException {
+    Set<String> labels = new HashSet<String>();
+    for (String p : args.split(",")) {
+      labels.add(p);
+    }
+
+    return addToClusterNodeLabels(labels);
+  }
+
+  private int addToClusterNodeLabels(Set<String> labels) throws IOException,
+      YarnException {
+    if (directlyAccessNodeLabelStore) {
+      getNodeLabelManagerInstance(getConf()).addToCluserNodeLabels(labels);
+    } else {
+      ResourceManagerAdministrationProtocol adminProtocol =
+          createAdminProtocol();
+      AddToClusterNodeLabelsRequest request =
+          AddToClusterNodeLabelsRequest.newInstance(labels);
+      adminProtocol.addToClusterNodeLabels(request);
+    }
+    return 0;
+  }
+
+  private int removeFromClusterNodeLabels(String args) throws IOException,
+      YarnException {
+    Set<String> labels = new HashSet<String>();
+    for (String p : args.split(",")) {
+      labels.add(p);
+    }
+    
+    if (directlyAccessNodeLabelStore) {
+      getNodeLabelManagerInstance(getConf()).removeFromClusterNodeLabels(labels);
+    } else {
+      ResourceManagerAdministrationProtocol adminProtocol =
+          createAdminProtocol();
+      RemoveFromClusterNodeLabelsRequest request =
+          RemoveFromClusterNodeLabelsRequest.newInstance(labels);
+      adminProtocol.removeFromClusterNodeLabels(request);
+    }
+
+    return 0;
+  }
+
+  private int getNodeToLabels() throws IOException, YarnException {
+    Map<NodeId, Set<String>> nodeToLabels = null;
+
+    if (directlyAccessNodeLabelStore) {
+      nodeToLabels = getNodeLabelManagerInstance(getConf()).getNodeLabels();
+    } else {
+      ResourceManagerAdministrationProtocol adminProtocol =
+          createAdminProtocol();
+
+      nodeToLabels =
+          adminProtocol.getNodeToLabels(GetNodesToLabelsRequest.newInstance())
+              .getNodeToLabels();
+    }
+    for (NodeId host : sortNodeIdSet(nodeToLabels.keySet())) {
+      System.out.println(String.format("Host=%s, Node-labels=[%s]",
+          (host.getPort() == 0 ? host.getHost() : host.toString()),
+          StringUtils.join(sortStrSet(nodeToLabels.get(host)), ",")));
+    }
+    return 0;
+  }
+
+  private int getClusterNodeLabels() throws IOException, YarnException {
+    Set<String> labels = null;
+    if (directlyAccessNodeLabelStore) {
+      labels = getNodeLabelManagerInstance(getConf()).getClusterNodeLabels();
+    } else {
+      ResourceManagerAdministrationProtocol adminProto = createAdminProtocol();
+      labels =
+          adminProto.getClusterNodeLabels(
+              GetClusterNodeLabelsRequest.newInstance()).getNodeLabels();
+    }
+
+    System.out.println(String.format("Node-labels=%s",
+        StringUtils.join(sortStrSet(labels).iterator(), ",")));
+    return 0;
+  }
+  
+  private List<NodeId> sortNodeIdSet(Set<NodeId> nodes) {
+    List<NodeId> list = new ArrayList<NodeId>();
+    list.addAll(nodes);
+    Collections.sort(list);
+    return list;
+  }
+  
+  private List<String> sortStrSet(Set<String> labels) {
+    List<String> list = new ArrayList<String>();
+    list.addAll(labels);
+    Collections.sort(list);
+    return list;
+  }
+  
+  private Map<NodeId, Set<String>> buildNodeLabelsFromStr(String args)
+      throws IOException {
+    Map<NodeId, Set<String>> map = new HashMap<NodeId, Set<String>>();
+
+    for (String nodeToLabels : args.split("[ \n]")) {
+      nodeToLabels = nodeToLabels.trim();
+      if (nodeToLabels.isEmpty() || nodeToLabels.startsWith("#")) {
+        continue;
+      }
+
+      String[] splits = nodeToLabels.split(",");
+      String nodeIdStr = splits[0];
+
+      if (nodeIdStr.trim().isEmpty()) {
+        throw new IOException("node name cannot be empty");
+      }
+
+      String nodeName;
+      int port;
+      if (nodeIdStr.contains(":")) {
+        nodeName = nodeIdStr.substring(0, nodeIdStr.indexOf(":"));
+        port = Integer.valueOf(nodeIdStr.substring(nodeIdStr.indexOf(":")));
+      } else {
+        nodeName = nodeIdStr;
+        port = 0;
+      }
+
+      NodeId nodeId = NodeId.newInstance(nodeName, port);
+
+      map.put(nodeId, new HashSet<String>());
+
+      for (int i = 1; i < splits.length; i++) {
+        if (!splits[i].trim().isEmpty()) {
+          map.get(nodeId).add(splits[i].trim().toLowerCase());
+        }
+      }
+    }
+
+    return map;
+  }
+
+  private int replaceLabelsOnNodes(String args) throws IOException,
+      YarnException {
+    Map<NodeId, Set<String>> map = buildNodeLabelsFromStr(args);
+    return replaceLabelsOnNodes(map);
+  }
+
+  private int replaceLabelsOnNodes(Map<NodeId, Set<String>> map)
+      throws IOException, YarnException {
+    if (directlyAccessNodeLabelStore) {
+      getNodeLabelManagerInstance(getConf()).replaceLabelsOnNode(map);
+    } else {
+      ResourceManagerAdministrationProtocol adminProtocol =
+          createAdminProtocol();
+      ReplaceLabelsOnNodeRequest request =
+          ReplaceLabelsOnNodeRequest.newInstance(map);
+      adminProtocol.replaceLabelsOnNode(request);
+    }
+    return 0;
+  }
+  
   @Override
   public int run(String[] args) throws Exception {
+    // -directlyAccessNodeLabelStore is a additional option for node label
+    // access, so just search if we have specified this option, and remove it
+    List<String> argsList = new ArrayList<String>();
+    for (int i = 0; i < args.length; i++) {
+      if (args[i].equals("-directlyAccessNodeLabelStore")) {
+        directlyAccessNodeLabelStore = true;
+      } else {
+        argsList.add(args[i]);
+      }
+    }
+    args = argsList.toArray(new String[0]);
+    
     YarnConfiguration yarnConf =
         getConf() == null ? new YarnConfiguration() : new YarnConfiguration(
             getConf());
@@ -351,6 +572,31 @@ public class RMAdminCLI extends HAAdmin {
       } else if ("-getGroups".equals(cmd)) {
         String[] usernames = Arrays.copyOfRange(args, i, args.length);
         exitCode = getGroups(usernames);
+      } else if ("-addToClusterNodeLabels".equals(cmd)) {
+        if (i >= args.length) {
+          System.err.println("No cluster node-labels are specified");
+          exitCode = -1;
+        } else {
+          exitCode = addToClusterNodeLabels(args[i]);
+        }
+      } else if ("-removeFromClusterNodeLabels".equals(cmd)) {
+        if (i >= args.length) {
+          System.err.println("No cluster node-labels are specified");
+          exitCode = -1;
+        } else {
+          exitCode = removeFromClusterNodeLabels(args[i]);
+        }
+      } else if ("-replaceLabelsOnNode".equals(cmd)) {
+        if (i >= args.length) {
+          System.err.println("No cluster node-labels are specified");
+          exitCode = -1;
+        } else {
+          exitCode = replaceLabelsOnNodes(args[i]);
+        }
+      } else if ("-getNodeToLabels".equals(cmd)) {
+        exitCode = getNodeToLabels();
+      } else if ("-getClusterNodeLabels".equals(cmd)) {
+        exitCode = getClusterNodeLabels();
       } else {
         exitCode = -1;
         System.err.println(cmd.substring(1) + ": Unknown command");
@@ -380,6 +626,9 @@ public class RMAdminCLI extends HAAdmin {
       System.err.println(cmd.substring(1) + ": "
                          + e.getLocalizedMessage());
     }
+    if (null != localNodeLabelsManager) {
+      localNodeLabelsManager.stop();
+    }
     return exitCode;
   }
 

+ 152 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMAdminCLI.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestRMAdminCLI.java

@@ -16,18 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.cli;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.never;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -37,9 +36,16 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.HAServiceStatus;
 import org.apache.hadoop.ha.HAServiceTarget;
+import org.apache.hadoop.service.Service.STATE;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.client.cli.RMAdminCLI;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.nodelabels.DummyCommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
@@ -49,6 +55,10 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMapp
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.collect.ImmutableSet;
 
 public class TestRMAdminCLI {
 
@@ -56,10 +66,25 @@ public class TestRMAdminCLI {
   private HAServiceProtocol haadmin;
   private RMAdminCLI rmAdminCLI;
   private RMAdminCLI rmAdminCLIWithHAEnabled;
+  private CommonNodeLabelsManager dummyNodeLabelsManager;
+  private boolean remoteAdminServiceAccessed = false;
 
+  @SuppressWarnings("static-access")
   @Before
-  public void configure() throws IOException {
+  public void configure() throws IOException, YarnException {
+    remoteAdminServiceAccessed = false;
+    dummyNodeLabelsManager = new DummyCommonNodeLabelsManager();
     admin = mock(ResourceManagerAdministrationProtocol.class);
+    when(admin.addToClusterNodeLabels(any(AddToClusterNodeLabelsRequest.class)))
+        .thenAnswer(new Answer<AddToClusterNodeLabelsResponse>() {
+
+          @Override
+          public AddToClusterNodeLabelsResponse answer(
+              InvocationOnMock invocation) throws Throwable {
+            remoteAdminServiceAccessed = true;
+            return AddToClusterNodeLabelsResponse.newInstance();
+          }
+        });
 
     haadmin = mock(HAServiceProtocol.class);
     when(haadmin.getServiceStatus()).thenReturn(new HAServiceStatus(
@@ -69,7 +94,6 @@ public class TestRMAdminCLI {
     when(haServiceTarget.getProxy(any(Configuration.class), anyInt()))
         .thenReturn(haadmin);
     rmAdminCLI = new RMAdminCLI(new Configuration()) {
-
       @Override
       protected ResourceManagerAdministrationProtocol createAdminProtocol()
           throws IOException {
@@ -81,6 +105,7 @@ public class TestRMAdminCLI {
         return haServiceTarget;
       }
     };
+    rmAdminCLI.localNodeLabelsManager = dummyNodeLabelsManager;
 
     YarnConfiguration conf = new YarnConfiguration();
     conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
@@ -360,6 +385,127 @@ public class TestRMAdminCLI {
       System.setErr(oldErrPrintStream);
     }
   }
+  
+  @Test
+  public void testAccessLocalNodeLabelManager() throws Exception {
+    assertFalse(dummyNodeLabelsManager.getServiceState() == STATE.STOPPED);
+    
+    String[] args =
+        { "-addToClusterNodeLabels", "x,y", "-directlyAccessNodeLabelStore" };
+    assertEquals(0, rmAdminCLI.run(args));
+    assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().containsAll(
+        ImmutableSet.of("x", "y")));
+    
+    // reset localNodeLabelsManager
+    dummyNodeLabelsManager.removeFromClusterNodeLabels(ImmutableSet.of("x", "y"));
+    
+    // change the sequence of "-directlyAccessNodeLabelStore" and labels,
+    // should not matter
+    args =
+        new String[] { "-addToClusterNodeLabels",
+            "-directlyAccessNodeLabelStore", "x,y" };
+    assertEquals(0, rmAdminCLI.run(args));
+    assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().containsAll(
+        ImmutableSet.of("x", "y")));
+    
+    // local node labels manager will be close after running
+    assertTrue(dummyNodeLabelsManager.getServiceState() == STATE.STOPPED);
+  }
+  
+  @Test
+  public void testAccessRemoteNodeLabelManager() throws Exception {
+    String[] args =
+        { "-addToClusterNodeLabels", "x,y" };
+    assertEquals(0, rmAdminCLI.run(args));
+    
+    // localNodeLabelsManager shouldn't accessed
+    assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().isEmpty());
+    
+    // remote node labels manager accessed
+    assertTrue(remoteAdminServiceAccessed);
+  }
+  
+  @Test
+  public void testAddToClusterNodeLabels() throws Exception {
+    // successfully add labels
+    String[] args =
+        { "-addToClusterNodeLabels", "x", "-directlyAccessNodeLabelStore" };
+    assertEquals(0, rmAdminCLI.run(args));
+    assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().containsAll(
+        ImmutableSet.of("x")));
+    
+    // no labels, should fail
+    args = new String[] { "-addToClusterNodeLabels" };
+    assertTrue(0 != rmAdminCLI.run(args));
+    
+    // no labels, should fail
+    args =
+        new String[] { "-addToClusterNodeLabels",
+            "-directlyAccessNodeLabelStore" };
+    assertTrue(0 != rmAdminCLI.run(args));
+  }
+  
+  @Test
+  public void testRemoveFromClusterNodeLabels() throws Exception {
+    // Successfully remove labels
+    dummyNodeLabelsManager.addToCluserNodeLabels(ImmutableSet.of("x"));
+    String[] args =
+        { "-removeFromClusterNodeLabels", "x", "-directlyAccessNodeLabelStore" };
+    assertEquals(0, rmAdminCLI.run(args));
+    assertTrue(dummyNodeLabelsManager.getClusterNodeLabels().isEmpty());
+    
+    // no labels, should fail
+    args = new String[] { "-removeFromClusterNodeLabels" };
+    assertTrue(0 != rmAdminCLI.run(args));
+    
+    // no labels, should fail
+    args =
+        new String[] { "-removeFromClusterNodeLabels",
+            "-directlyAccessNodeLabelStore" };
+    assertTrue(0 != rmAdminCLI.run(args));
+  }
+  
+  @Test
+  public void testReplaceLabelsOnNode() throws Exception {
+    // Successfully replace labels
+    dummyNodeLabelsManager.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+    String[] args =
+        { "-replaceLabelsOnNode", "node1,x,y node2,y",
+            "-directlyAccessNodeLabelStore" };
+    assertEquals(0, rmAdminCLI.run(args));
+    assertTrue(dummyNodeLabelsManager.getNodeLabels().containsKey(
+        NodeId.newInstance("node1", 0)));
+    assertTrue(dummyNodeLabelsManager.getNodeLabels().containsKey(
+        NodeId.newInstance("node2", 0)));
+    
+    // no labels, should fail
+    args = new String[] { "-replaceLabelsOnNode" };
+    assertTrue(0 != rmAdminCLI.run(args));
+    
+    // no labels, should fail
+    args =
+        new String[] { "-replaceLabelsOnNode",
+            "-directlyAccessNodeLabelStore" };
+    assertTrue(0 != rmAdminCLI.run(args));
+  }
+  
+  @Test
+  public void testGetClusterNodeLabels() throws Exception {
+    // Successfully get labels
+    String[] args =
+        { "-getClusterNodeLabels",
+            "-directlyAccessNodeLabelStore" };
+    assertEquals(0, rmAdminCLI.run(args));
+  }
+  
+  @Test
+  public void testGetNodeToLabels() throws Exception {
+    // Successfully get node-to-labels
+    String[] args =
+        { "-getNodeToLabels",
+            "-directlyAccessNodeLabelStore" };
+    assertEquals(0, rmAdminCLI.run(args));
+  }
 
   private void testError(String[] args, String template,
       ByteArrayOutputStream data, int resultCode) throws Exception {

+ 5 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java

@@ -238,7 +238,11 @@ public class CommonNodeLabelsManager extends AbstractService {
   protected void serviceStop() throws Exception {
     // finalize store
     stopDispatcher();
-    store.close();
+    
+    // only close store when we enabled store persistent
+    if (null != store) {
+      store.close();
+    }
   }
 
   /**

+ 96 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java

@@ -29,17 +29,28 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetClusterNodeLabelsRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetNodesToLabelsRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshServiceAclsRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshSuperUserGroupsConfigurationRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto;
 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
@@ -52,8 +63,18 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsC
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesRequestPBImpl;
@@ -66,6 +87,10 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshSuperUse
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshSuperUserGroupsConfigurationResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl;
 
@@ -205,5 +230,75 @@ public class ResourceManagerAdministrationProtocolPBClientImpl implements Resour
       return null;
     }
   }
-  
+
+  @Override
+  public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
+      AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
+    AddToClusterNodeLabelsRequestProto requestProto =
+        ((AddToClusterNodeLabelsRequestPBImpl) request).getProto();
+    try {
+      return new AddToClusterNodeLabelsResponsePBImpl(
+          proxy.addToClusterNodeLabels(null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
+      RemoveFromClusterNodeLabelsRequest request) throws YarnException,
+      IOException {
+    RemoveFromClusterNodeLabelsRequestProto requestProto =
+        ((RemoveFromClusterNodeLabelsRequestPBImpl) request).getProto();
+    try {
+      return new RemoveFromClusterNodeLabelsResponsePBImpl(
+          proxy.removeFromClusterNodeLabels(null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
+      ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
+    ReplaceLabelsOnNodeRequestProto requestProto =
+        ((ReplaceLabelsOnNodeRequestPBImpl) request).getProto();
+    try {
+      return new ReplaceLabelsOnNodeResponsePBImpl(proxy.replaceLabelsOnNodes(
+          null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public GetNodesToLabelsResponse getNodeToLabels(GetNodesToLabelsRequest request)
+      throws YarnException, IOException {
+    GetNodesToLabelsRequestProto requestProto =
+        ((GetNodesToLabelsRequestPBImpl) request).getProto();
+    try {
+      return new GetNodesToLabelsResponsePBImpl(proxy.getNodeToLabels(
+          null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public GetClusterNodeLabelsResponse getClusterNodeLabels(
+      GetClusterNodeLabelsRequest request) throws YarnException, IOException {
+    GetClusterNodeLabelsRequestProto requestProto =
+        ((GetClusterNodeLabelsRequestPBImpl) request).getProto();
+    try {
+      return new GetClusterNodeLabelsResponsePBImpl(proxy.getClusterNodeLabels(
+          null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
 }

+ 107 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/ResourceManagerAdministrationProtocolPBServiceImpl.java

@@ -22,8 +22,14 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.AddToClusterNodeLabelsResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetClusterNodeLabelsRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetClusterNodeLabelsResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetGroupsForUserResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetNodesToLabelsRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.GetNodesToLabelsResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshAdminAclsResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesRequestProto;
@@ -36,17 +42,32 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.Refre
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshSuperUserGroupsConfigurationResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshUserToGroupsMappingsResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RemoveFromClusterNodeLabelsResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ReplaceLabelsOnNodeResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceResponseProto;
 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.AddToClusterNodeLabelsResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshAdminAclsResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshNodesRequestPBImpl;
@@ -59,6 +80,10 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshSuperUse
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshSuperUserGroupsConfigurationResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RefreshUserToGroupsMappingsResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RemoveFromClusterNodeLabelsResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl;
 
@@ -204,4 +229,86 @@ public class ResourceManagerAdministrationProtocolPBServiceImpl implements Resou
     }
   }
 
+  @Override
+  public AddToClusterNodeLabelsResponseProto addToClusterNodeLabels(
+      RpcController controller, AddToClusterNodeLabelsRequestProto proto)
+      throws ServiceException {
+    AddToClusterNodeLabelsRequestPBImpl request =
+        new AddToClusterNodeLabelsRequestPBImpl(proto);
+    try {
+      AddToClusterNodeLabelsResponse response =
+          real.addToClusterNodeLabels(request);
+      return ((AddToClusterNodeLabelsResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public RemoveFromClusterNodeLabelsResponseProto removeFromClusterNodeLabels(
+      RpcController controller, RemoveFromClusterNodeLabelsRequestProto proto)
+      throws ServiceException {
+    RemoveFromClusterNodeLabelsRequestPBImpl request =
+        new RemoveFromClusterNodeLabelsRequestPBImpl(proto);
+    try {
+      RemoveFromClusterNodeLabelsResponse response =
+          real.removeFromClusterNodeLabels(request);
+      return ((RemoveFromClusterNodeLabelsResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public ReplaceLabelsOnNodeResponseProto replaceLabelsOnNodes(
+      RpcController controller, ReplaceLabelsOnNodeRequestProto proto)
+      throws ServiceException {
+    ReplaceLabelsOnNodeRequestPBImpl request =
+        new ReplaceLabelsOnNodeRequestPBImpl(proto);
+    try {
+      ReplaceLabelsOnNodeResponse response = real.replaceLabelsOnNode(request);
+      return ((ReplaceLabelsOnNodeResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public GetNodesToLabelsResponseProto getNodeToLabels(
+      RpcController controller, GetNodesToLabelsRequestProto proto)
+      throws ServiceException {
+    GetNodesToLabelsRequestPBImpl request =
+        new GetNodesToLabelsRequestPBImpl(proto);
+    try {
+      GetNodesToLabelsResponse response = real.getNodeToLabels(request);
+      return ((GetNodesToLabelsResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public GetClusterNodeLabelsResponseProto getClusterNodeLabels(
+      RpcController controller, GetClusterNodeLabelsRequestProto proto)
+      throws ServiceException {
+    GetClusterNodeLabelsRequestPBImpl request =
+        new GetClusterNodeLabelsRequestPBImpl(proto);
+    try {
+      GetClusterNodeLabelsResponse response =
+          real.getClusterNodeLabels(request);
+      return ((GetClusterNodeLabelsResponsePBImpl) response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/nodelabels/DummyCommonNodeLabelsManager.java

@@ -78,4 +78,9 @@ public class DummyCommonNodeLabelsManager extends CommonNodeLabelsManager {
   protected void stopDispatcher() {
     // do nothing
   }
+  
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+  }
 }

+ 112 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java

@@ -57,6 +57,12 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetNodesToLabelsResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
@@ -69,8 +75,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsC
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetClusterNodeLabelsResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetNodesToLabelsResponsePBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
@@ -618,4 +630,104 @@ public class AdminService extends CompositeService implements
   public Server getServer() {
     return this.server;
   }
+
+  @Override
+  public AddToClusterNodeLabelsResponse addToClusterNodeLabels(AddToClusterNodeLabelsRequest request)
+      throws YarnException, IOException {
+    String argName = "addToClusterNodeLabels";
+    UserGroupInformation user = checkAcls(argName);
+
+    if (!isRMActive()) {
+      RMAuditLogger.logFailure(user.getShortUserName(), argName,
+          adminAcl.toString(), "AdminService",
+          "ResourceManager is not active. Can not add labels.");
+      throwStandbyException();
+    }
+
+    AddToClusterNodeLabelsResponse response =
+        recordFactory.newRecordInstance(AddToClusterNodeLabelsResponse.class);
+    try {
+      rmContext.getNodeLabelManager().addToCluserNodeLabels(request.getNodeLabels());
+      RMAuditLogger
+          .logSuccess(user.getShortUserName(), argName, "AdminService");
+      return response;
+    } catch (IOException ioe) {
+      LOG.info("Exception add labels", ioe);
+      RMAuditLogger.logFailure(user.getShortUserName(), argName,
+          adminAcl.toString(), "AdminService", "Exception add label");
+      throw RPCUtil.getRemoteException(ioe);
+    }
+  }
+
+  @Override
+  public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
+      RemoveFromClusterNodeLabelsRequest request) throws YarnException, IOException {
+    String argName = "removeFromClusterNodeLabels";
+    UserGroupInformation user = checkAcls(argName);
+
+    if (!isRMActive()) {
+      RMAuditLogger.logFailure(user.getShortUserName(), argName,
+          adminAcl.toString(), "AdminService",
+          "ResourceManager is not active. Can not remove labels.");
+      throwStandbyException();
+    }
+
+    RemoveFromClusterNodeLabelsResponse response =
+        recordFactory.newRecordInstance(RemoveFromClusterNodeLabelsResponse.class);
+    try {
+      rmContext.getNodeLabelManager().removeFromClusterNodeLabels(request.getNodeLabels());
+      RMAuditLogger
+          .logSuccess(user.getShortUserName(), argName, "AdminService");
+      return response;
+    } catch (IOException ioe) {
+      LOG.info("Exception remove labels", ioe);
+      RMAuditLogger.logFailure(user.getShortUserName(), argName,
+          adminAcl.toString(), "AdminService", "Exception remove label");
+      throw RPCUtil.getRemoteException(ioe);
+    }
+  }
+
+  @Override
+  public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
+      ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
+    String argName = "replaceLabelsOnNode";
+    UserGroupInformation user = checkAcls(argName);
+
+    if (!isRMActive()) {
+      RMAuditLogger.logFailure(user.getShortUserName(), argName,
+          adminAcl.toString(), "AdminService",
+          "ResourceManager is not active. Can not set node to labels.");
+      throwStandbyException();
+    }
+
+    ReplaceLabelsOnNodeResponse response =
+        recordFactory.newRecordInstance(ReplaceLabelsOnNodeResponse.class);
+    try {
+      rmContext.getNodeLabelManager().replaceLabelsOnNode(
+          request.getNodeToLabels());
+      RMAuditLogger
+          .logSuccess(user.getShortUserName(), argName, "AdminService");
+      return response;
+    } catch (IOException ioe) {
+      LOG.info("Exception set node to labels. ", ioe);
+      RMAuditLogger.logFailure(user.getShortUserName(), argName,
+          adminAcl.toString(), "AdminService",
+          "Exception set node to labels.");
+      throw RPCUtil.getRemoteException(ioe);
+    }
+  }
+
+  @Override
+  public GetNodesToLabelsResponse getNodeToLabels(GetNodesToLabelsRequest request)
+      throws YarnException, IOException {
+    return GetNodesToLabelsResponsePBImpl.newInstance(rmContext
+        .getNodeLabelManager().getNodeLabels());
+  }
+
+  @Override
+  public GetClusterNodeLabelsResponse getClusterNodeLabels(GetClusterNodeLabelsRequest request)
+      throws YarnException, IOException {
+    return GetClusterNodeLabelsResponsePBImpl.newInstance(rmContext.getNodeLabelManager()
+        .getClusterNodeLabels());
+  }
 }