Sfoglia il codice sorgente

YARN-11379. [Federation] Support mapAttributesToNodes, getGroupsForUser API's for Federation. (#5596)

slfan1989 2 anni fa
parent
commit
668c0a0930

+ 32 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodesToAttributesMappingRequest.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.util.Records;
 
@@ -42,6 +43,19 @@ public abstract class NodesToAttributesMappingRequest {
     return request;
   }
 
+  public static NodesToAttributesMappingRequest newInstance(
+      AttributeMappingOperationType operation, List<NodeToAttributes> nodesToAttributes,
+      boolean failOnUnknownNodes, String subClusterId) {
+    NodesToAttributesMappingRequest request =
+        Records.newRecord(NodesToAttributesMappingRequest.class);
+    request.setNodesToAttributes(nodesToAttributes);
+    request.setFailOnUnknownNodes(failOnUnknownNodes);
+    request.setOperation(operation);
+    request.setOperation(operation);
+    request.setSubClusterId(subClusterId);
+    return request;
+  }
+
   @Public
   @Unstable
   public abstract void setNodesToAttributes(
@@ -66,4 +80,22 @@ public abstract class NodesToAttributesMappingRequest {
   @Public
   @Unstable
   public abstract AttributeMappingOperationType getOperation();
+
+  /**
+   * Get the subClusterId.
+   *
+   * @return subClusterId.
+   */
+  @Public
+  @InterfaceStability.Evolving
+  public abstract String getSubClusterId();
+
+  /**
+   * Set the subClusterId.
+   *
+   * @param subClusterId subCluster Id.
+   */
+  @Public
+  @InterfaceStability.Evolving
+  public abstract void setSubClusterId(String subClusterId);
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto

@@ -156,6 +156,7 @@ message NodesToAttributesMappingRequestProto {
   optional AttributeMappingOperationTypeProto operation = 1 [default = REPLACE];
   repeated NodeToAttributesProto nodeToAttributes = 2;
   optional bool failOnUnknownNodes = 3;
+  optional string sub_cluster_id = 4;
 }
 
 message NodesToAttributesMappingResponseProto {

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodesToAttributesMappingRequestPBImpl.java

@@ -157,6 +157,22 @@ public class NodesToAttributesMappingRequestPBImpl
     return convertFromProtoFormat(p.getOperation());
   }
 
+  @Override
+  public String getSubClusterId() {
+    NodesToAttributesMappingRequestProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.hasSubClusterId()) ? p.getSubClusterId() : null;
+  }
+
+  @Override
+  public void setSubClusterId(String subClusterId) {
+    maybeInitBuilder();
+    if (subClusterId == null) {
+      builder.clearSubClusterId();
+      return;
+    }
+    builder.setSubClusterId(subClusterId);
+  }
+
   @Override
   public int hashCode() {
     return getProto().hashCode();

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java

@@ -819,6 +819,13 @@ public class MockRM extends ResourceManager {
         }
         return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class);
       }
+
+      public String[] getGroupsForUser(String user) throws IOException {
+        if ("admin".equals(user)) {
+          return new String[]{"admin"};
+        }
+        return new String[]{};
+      }
     };
   }
 

+ 62 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java

@@ -175,6 +175,10 @@ public final class RouterMetrics {
   private MutableGaugeInt numCheckForDecommissioningNodesFailedRetrieved;
   @Metric("# of refreshClusterMaxPriority failed to be retrieved")
   private MutableGaugeInt numRefreshClusterMaxPriorityFailedRetrieved;
+  @Metric("# of mapAttributesToNodes failed to be retrieved")
+  private MutableGaugeInt numMapAttributesToNodesFailedRetrieved;
+  @Metric("# of getGroupsForUser failed to be retrieved")
+  private MutableGaugeInt numGetGroupsForUserFailedRetrieved;
 
   // Aggregate metrics are shared, and don't have to be looked up per call
   @Metric("Total number of successful Submitted apps and latency(ms)")
@@ -311,6 +315,10 @@ public final class RouterMetrics {
   private MutableRate totalSucceededCheckForDecommissioningNodesRetrieved;
   @Metric("Total number of successful Retrieved RefreshClusterMaxPriority and latency(ms)")
   private MutableRate totalSucceededRefreshClusterMaxPriorityRetrieved;
+  @Metric("Total number of successful Retrieved MapAttributesToNodes and latency(ms)")
+  private MutableRate totalSucceededMapAttributesToNodesRetrieved;
+  @Metric("Total number of successful Retrieved GetGroupsForUser and latency(ms)")
+  private MutableRate totalSucceededGetGroupsForUsersRetrieved;
 
   /**
    * Provide quantile counters for all latencies.
@@ -382,6 +390,8 @@ public final class RouterMetrics {
   private MutableQuantiles refreshNodesResourcesLatency;
   private MutableQuantiles checkForDecommissioningNodesLatency;
   private MutableQuantiles refreshClusterMaxPriorityLatency;
+  private MutableQuantiles mapAttributesToNodesLatency;
+  private MutableQuantiles getGroupsForUserLatency;
 
   private static volatile RouterMetrics instance = null;
   private static MetricsRegistry registry;
@@ -616,6 +626,12 @@ public final class RouterMetrics {
 
     refreshClusterMaxPriorityLatency = registry.newQuantiles("refreshClusterMaxPriorityLatency",
         "latency of refresh cluster max priority timeouts", "ops", "latency", 10);
+
+    mapAttributesToNodesLatency = registry.newQuantiles("mapAttributesToNodesLatency",
+        "latency of map attributes to nodes timeouts", "ops", "latency", 10);
+
+    getGroupsForUserLatency = registry.newQuantiles("getGroupsForUserLatency",
+        "latency of get groups for user timeouts", "ops", "latency", 10);
   }
 
   public static RouterMetrics getMetrics() {
@@ -952,6 +968,16 @@ public final class RouterMetrics {
     return totalSucceededRefreshClusterMaxPriorityRetrieved.lastStat().numSamples();
   }
 
+  @VisibleForTesting
+  public long getNumSucceededMapAttributesToNodesRetrieved() {
+    return totalSucceededMapAttributesToNodesRetrieved.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  public long getNumSucceededGetGroupsForUsersRetrieved() {
+    return totalSucceededGetGroupsForUsersRetrieved.lastStat().numSamples();
+  }
+
   @VisibleForTesting
   public long getNumSucceededRefreshSuperUserGroupsConfigurationRetrieved() {
     return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().numSamples();
@@ -1282,6 +1308,16 @@ public final class RouterMetrics {
     return totalSucceededRefreshClusterMaxPriorityRetrieved.lastStat().mean();
   }
 
+  @VisibleForTesting
+  public double getLatencySucceededMapAttributesToNodesRetrieved() {
+    return totalSucceededMapAttributesToNodesRetrieved.lastStat().mean();
+  }
+
+  @VisibleForTesting
+  public double getLatencySucceededGetGroupsForUsersRetrieved() {
+    return totalSucceededGetGroupsForUsersRetrieved.lastStat().mean();
+  }
+
   @VisibleForTesting
   public double getLatencySucceededRefreshSuperUserGroupsConfigurationRetrieved() {
     return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().mean();
@@ -1559,6 +1595,14 @@ public final class RouterMetrics {
     return numRefreshClusterMaxPriorityFailedRetrieved.value();
   }
 
+  public int getMapAttributesToNodesFailedRetrieved() {
+    return numMapAttributesToNodesFailedRetrieved.value();
+  }
+
+  public int getGroupsForUserFailedRetrieved() {
+    return numGetGroupsForUserFailedRetrieved.value();
+  }
+
   public int getDelegationTokenFailedRetrieved() {
     return numGetDelegationTokenFailedRetrieved.value();
   }
@@ -1902,6 +1946,16 @@ public final class RouterMetrics {
     refreshClusterMaxPriorityLatency.add(duration);
   }
 
+  public void succeededMapAttributesToNodesRetrieved(long duration) {
+    totalSucceededMapAttributesToNodesRetrieved.add(duration);
+    mapAttributesToNodesLatency.add(duration);
+  }
+
+  public void succeededGetGroupsForUsersRetrieved(long duration) {
+    totalSucceededGetGroupsForUsersRetrieved.add(duration);
+    getGroupsForUserLatency.add(duration);
+  }
+
   public void succeededRefreshSuperUserGroupsConfRetrieved(long duration) {
     totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.add(duration);
     refreshSuperUserGroupsConfLatency.add(duration);
@@ -2154,6 +2208,14 @@ public final class RouterMetrics {
     numRefreshClusterMaxPriorityFailedRetrieved.incr();
   }
 
+  public void incrMapAttributesToNodesFailedRetrieved() {
+    numMapAttributesToNodesFailedRetrieved.incr();
+  }
+
+  public void incrGetGroupsForUserFailedRetrieved() {
+    numGetGroupsForUserFailedRetrieved.incr();
+  }
+
   public void incrGetDelegationTokenFailedRetrieved() {
     numGetDelegationTokenFailedRetrieved.incr();
   }

+ 67 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java

@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.server.router.rmadmin;
 
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
@@ -75,6 +74,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Collection;
 import java.util.Set;
+import java.util.HashSet;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.BlockingQueue;
@@ -714,14 +714,76 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
 
   @Override
   public NodesToAttributesMappingResponse mapAttributesToNodes(
-      NodesToAttributesMappingRequest request)
-      throws YarnException, IOException {
-    throw new NotImplementedException();
+      NodesToAttributesMappingRequest request) throws YarnException, IOException {
+    // parameter verification.
+    if (request == null) {
+      routerMetrics.incrMapAttributesToNodesFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Missing mapAttributesToNodes request.", null);
+    }
+
+    String subClusterId = request.getSubClusterId();
+    if (StringUtils.isBlank(subClusterId)) {
+      routerMetrics.incrMapAttributesToNodesFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Missing mapAttributesToNodes SubClusterId.", null);
+    }
+
+    try {
+      long startTime = clock.getTime();
+      RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
+          new Class[]{NodesToAttributesMappingRequest.class}, new Object[]{request});
+      Collection<NodesToAttributesMappingResponse> mapAttributesToNodesResps =
+          remoteMethod.invokeConcurrent(this, NodesToAttributesMappingResponse.class,
+          subClusterId);
+      if (CollectionUtils.isNotEmpty(mapAttributesToNodesResps)) {
+        long stopTime = clock.getTime();
+        routerMetrics.succeededMapAttributesToNodesRetrieved(stopTime - startTime);
+        return NodesToAttributesMappingResponse.newInstance();
+      }
+    } catch (YarnException e) {
+      routerMetrics.incrMapAttributesToNodesFailedRetrieved();
+      RouterServerUtil.logAndThrowException(e,
+          "Unable to mapAttributesToNodes due to exception. " + e.getMessage());
+    }
+
+    routerMetrics.incrMapAttributesToNodesFailedRetrieved();
+    throw new YarnException("Unable to mapAttributesToNodes.");
   }
 
   @Override
   public String[] getGroupsForUser(String user) throws IOException {
-    return new String[0];
+    // parameter verification.
+    if (StringUtils.isBlank(user)) {
+      routerMetrics.incrGetGroupsForUserFailedRetrieved();
+      RouterServerUtil.logAndThrowIOException("Missing getGroupsForUser user.", null);
+    }
+
+    try {
+      long startTime = clock.getTime();
+      RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
+          new Class[]{String.class}, new Object[]{user});
+      Collection<String[]> getGroupsForUserResps =
+          remoteMethod.invokeConcurrent(this, String[].class, null);
+      if (CollectionUtils.isNotEmpty(getGroupsForUserResps)) {
+        long stopTime = clock.getTime();
+        Set<String> groups = new HashSet<>();
+        for (String[] groupArr : getGroupsForUserResps) {
+          if (groupArr != null && groupArr.length > 0) {
+            for (String group : groupArr) {
+              groups.add(group);
+            }
+          }
+        }
+        routerMetrics.succeededGetGroupsForUsersRetrieved(stopTime - startTime);
+        return groups.toArray(new String[]{});
+      }
+    } catch (YarnException e) {
+      routerMetrics.incrGetGroupsForUserFailedRetrieved();
+      RouterServerUtil.logAndThrowIOException(e,
+          "Unable to getGroupsForUser due to exception. " + e.getMessage());
+    }
+
+    routerMetrics.incrGetGroupsForUserFailedRetrieved();
+    throw new IOException("Unable to getGroupsForUser.");
   }
 
   @VisibleForTesting

+ 66 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java

@@ -608,6 +608,16 @@ public class TestRouterMetrics {
       LOG.info("Mocked: failed refreshClusterMaxPriority call");
       metrics.incrRefreshClusterMaxPriorityFailedRetrieved();
     }
+
+    public void getMapAttributesToNodesFailed() {
+      LOG.info("Mocked: failed getMapAttributesToNode call");
+      metrics.incrMapAttributesToNodesFailedRetrieved();
+    }
+
+    public void getGroupsForUserFailed() {
+      LOG.info("Mocked: failed getGroupsForUser call");
+      metrics.incrGetGroupsForUserFailedRetrieved();
+    }
   }
 
   // Records successes for all calls
@@ -920,6 +930,18 @@ public class TestRouterMetrics {
           duration);
       metrics.succeededRefreshClusterMaxPriorityRetrieved(duration);
     }
+
+    public void getMapAttributesToNodesRetrieved(long duration) {
+      LOG.info("Mocked: successful MapAttributesToNodes call with duration {}",
+          duration);
+      metrics.succeededMapAttributesToNodesRetrieved(duration);
+    }
+
+    public void getGroupsForUsersRetrieved(long duration) {
+      LOG.info("Mocked: successful GetGroupsForUsers call with duration {}",
+          duration);
+      metrics.succeededGetGroupsForUsersRetrieved(duration);
+    }
   }
 
   @Test
@@ -2109,4 +2131,48 @@ public class TestRouterMetrics {
     Assert.assertEquals(225,
         metrics.getLatencySucceededRefreshClusterMaxPriorityRetrieved(), ASSERT_DOUBLE_DELTA);
   }
+
+  @Test
+  public void testGetMapAttributesToNodesFailedRetrieved() {
+    long totalBadBefore = metrics.getMapAttributesToNodesFailedRetrieved();
+    badSubCluster.getMapAttributesToNodesFailed();
+    Assert.assertEquals(totalBadBefore + 1, metrics.getMapAttributesToNodesFailedRetrieved());
+  }
+
+  @Test
+  public void testGetMapAttributesToNodesRetrieved() {
+    long totalGoodBefore = metrics.getNumSucceededMapAttributesToNodesRetrieved();
+    goodSubCluster.getMapAttributesToNodesRetrieved(150);
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededMapAttributesToNodesRetrieved());
+    Assert.assertEquals(150,
+        metrics.getLatencySucceededMapAttributesToNodesRetrieved(), ASSERT_DOUBLE_DELTA);
+    goodSubCluster.getMapAttributesToNodesRetrieved(300);
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededMapAttributesToNodesRetrieved());
+    Assert.assertEquals(225,
+        metrics.getLatencySucceededMapAttributesToNodesRetrieved(), ASSERT_DOUBLE_DELTA);
+  }
+
+  @Test
+  public void testGetGroupsForUserFailedRetrieved() {
+    long totalBadBefore = metrics.getGroupsForUserFailedRetrieved();
+    badSubCluster.getGroupsForUserFailed();
+    Assert.assertEquals(totalBadBefore + 1, metrics.getGroupsForUserFailedRetrieved());
+  }
+
+  @Test
+  public void testGetGroupsForUserRetrieved() {
+    long totalGoodBefore = metrics.getNumSucceededGetGroupsForUsersRetrieved();
+    goodSubCluster.getGroupsForUsersRetrieved(150);
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededGetGroupsForUsersRetrieved());
+    Assert.assertEquals(150,
+        metrics.getLatencySucceededGetGroupsForUsersRetrieved(), ASSERT_DOUBLE_DELTA);
+    goodSubCluster.getGroupsForUsersRetrieved(300);
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededGetGroupsForUsersRetrieved());
+    Assert.assertEquals(225,
+        metrics.getLatencySucceededGetGroupsForUsersRetrieved(), ASSERT_DOUBLE_DELTA);
+  }
 }

+ 53 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java

@@ -26,6 +26,8 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeType;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
@@ -51,6 +53,9 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequ
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;
 import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
@@ -60,7 +65,9 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -550,4 +557,50 @@ public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest {
     assertNotNull(nodeIds);
     assertEquals(0, nodeIds.size());
   }
+
+  @Test
+  public void testMapAttributesToNodesRequest() throws Exception {
+    // null request1.
+    LambdaTestUtils.intercept(YarnException.class, "Missing mapAttributesToNodes request.",
+        () -> interceptor.mapAttributesToNodes(null));
+
+    // null request2.
+    NodeAttribute nodeAttribute = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "x",
+        NodeAttributeType.STRING, "dfasdf");
+    List<NodeAttribute> nodeAttributeList = Collections.singletonList(nodeAttribute);
+    NodeToAttributes nodeToAttributes = NodeToAttributes.newInstance("host1", nodeAttributeList);
+    List<NodeToAttributes> nodeToAttributesList = Collections.singletonList(nodeToAttributes);
+    NodesToAttributesMappingRequest request = NodesToAttributesMappingRequest.newInstance(
+        AttributeMappingOperationType.ADD, nodeToAttributesList, true, null);
+    LambdaTestUtils.intercept(YarnException.class, "Missing mapAttributesToNodes SubClusterId.",
+        () -> interceptor.mapAttributesToNodes(request));
+  }
+
+  @Test
+  public void testMapAttributesToNodesNormalRequest() throws Exception {
+    NodeAttribute nodeAttribute = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "x",
+        NodeAttributeType.STRING, "dfasdf");
+    List<NodeAttribute> nodeAttributeList = Collections.singletonList(nodeAttribute);
+    NodeToAttributes nodeToAttributes =
+        NodeToAttributes.newInstance("127.0.0.1", nodeAttributeList);
+    List<NodeToAttributes> nodeToAttributesList = Collections.singletonList(nodeToAttributes);
+    NodesToAttributesMappingRequest request = NodesToAttributesMappingRequest.newInstance(
+        AttributeMappingOperationType.ADD, nodeToAttributesList, true, "SC-1");
+    interceptor.mapAttributesToNodes(request);
+  }
+
+  @Test
+  public void testGetGroupsForUserRequest() throws Exception {
+    // null request1.
+    LambdaTestUtils.intercept(IOException.class, "Missing getGroupsForUser user.",
+        () -> interceptor.getGroupsForUser(null));
+  }
+
+  @Test
+  public void testGetGroupsForUserNormalRequest() throws Exception {
+    String[] groups = interceptor.getGroupsForUser("admin");
+    assertNotNull(groups);
+    assertEquals(1, groups.length);
+    assertEquals("admin", groups[0]);
+  }
 }