Browse Source

YARN-11373. [Federation] Support refreshQueues refreshNodes API's for Federation. (#5146)

slfan1989 2 years ago
parent
commit
f71fd885be
12 changed files with 330 additions and 30 deletions
  1. 25 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java
  2. 17 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshQueuesRequest.java
  3. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto
  4. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/package-info.java
  5. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/package-info.java
  6. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/admin/package-info.java
  7. 19 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java
  8. 29 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshQueuesRequestPBImpl.java
  9. 35 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
  10. 69 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java
  11. 58 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminProtocolMethod.java
  12. 67 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java

+ 25 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshNodesRequest.java

@@ -53,6 +53,17 @@ public abstract class RefreshNodesRequest {
     return request;
     return request;
   }
   }
 
 
+  @Private
+  @Unstable
+  public static RefreshNodesRequest newInstance(
+      DecommissionType decommissionType, Integer timeout, String subClusterId) {
+    RefreshNodesRequest request = Records.newRecord(RefreshNodesRequest.class);
+    request.setDecommissionType(decommissionType);
+    request.setDecommissionTimeout(timeout);
+    request.setSubClusterId(subClusterId);
+    return request;
+  }
+
   /**
   /**
    * Set the DecommissionType.
    * Set the DecommissionType.
    * 
    * 
@@ -80,4 +91,18 @@ public abstract class RefreshNodesRequest {
    * @return decommissionTimeout
    * @return decommissionTimeout
    */
    */
   public abstract Integer getDecommissionTimeout();
   public abstract Integer getDecommissionTimeout();
+
+  /**
+   * Get the subClusterId.
+   *
+   * @return subClusterId.
+   */
+  public abstract String getSubClusterId();
+
+  /**
+   * Set the subClusterId.
+   *
+   * @param subClusterId subCluster Id.
+   */
+  public abstract void setSubClusterId(String subClusterId);
 }
 }

+ 17 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RefreshQueuesRequest.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords;
 
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.Records;
 
 
@@ -33,4 +34,20 @@ public abstract class RefreshQueuesRequest {
         Records.newRecord(RefreshQueuesRequest.class);
         Records.newRecord(RefreshQueuesRequest.class);
     return request;
     return request;
   }
   }
+
+  @Public
+  @Stable
+  public static RefreshQueuesRequest newInstance(String subClusterId) {
+    RefreshQueuesRequest request = Records.newRecord(RefreshQueuesRequest.class);
+    request.setSubClusterId(subClusterId);
+    return request;
+  }
+
+  @Public
+  @Unstable
+  public abstract String getSubClusterId();
+
+  @Private
+  @Unstable
+  public abstract void setSubClusterId(String subClusterId);
 }
 }

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/server/yarn_server_resourcemanager_service_protos.proto

@@ -32,6 +32,7 @@ package hadoop.yarn;
 import "yarn_protos.proto";
 import "yarn_protos.proto";
 
 
 message RefreshQueuesRequestProto {
 message RefreshQueuesRequestProto {
+  optional string sub_cluster_id = 1;
 }
 }
 message RefreshQueuesResponseProto {
 message RefreshQueuesResponseProto {
 }
 }
@@ -39,6 +40,7 @@ message RefreshQueuesResponseProto {
 message RefreshNodesRequestProto {
 message RefreshNodesRequestProto {
   optional DecommissionTypeProto decommissionType = 1 [default = NORMAL];
   optional DecommissionTypeProto decommissionType = 1 [default = NORMAL];
   optional int32 decommissionTimeout = 2;
   optional int32 decommissionTimeout = 2;
+  optional string sub_cluster_id = 3;
 }
 }
 message RefreshNodesResponseProto {
 message RefreshNodesResponseProto {
 }
 }

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/package-info.java

@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * distributed with this work for additional information
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * See the License for the specific language governing permissions and
  * limitations under the License.
  * limitations under the License.
  */
  */
-@InterfaceAudience.Public
+@Public
 package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
 package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
-import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
 
 

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/package-info.java

@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * distributed with this work for additional information
@@ -15,6 +15,6 @@
  * See the License for the specific language governing permissions and
  * See the License for the specific language governing permissions and
  * limitations under the License.
  * limitations under the License.
  */
  */
-@InterfaceAudience.Public
+@Public
 package org.apache.hadoop.yarn.logaggregation.filecontroller.tfile;
 package org.apache.hadoop.yarn.logaggregation.filecontroller.tfile;
-import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Public;

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/admin/package-info.java

@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * distributed with this work for additional information
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * See the License for the specific language governing permissions and
  * limitations under the License.
  * limitations under the License.
  */
  */
-@InterfaceAudience.Public
+@Public
 package org.apache.hadoop.yarn.security.admin;
 package org.apache.hadoop.yarn.security.admin;
-import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
 
 

+ 19 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshNodesRequestPBImpl.java

@@ -31,9 +31,9 @@ import org.apache.hadoop.thirdparty.protobuf.TextFormat;
 @Private
 @Private
 @Unstable
 @Unstable
 public class RefreshNodesRequestPBImpl extends RefreshNodesRequest {
 public class RefreshNodesRequestPBImpl extends RefreshNodesRequest {
-  RefreshNodesRequestProto proto = RefreshNodesRequestProto.getDefaultInstance();
-  RefreshNodesRequestProto.Builder builder = null;
-  boolean viaProto = false;
+  private RefreshNodesRequestProto proto = RefreshNodesRequestProto.getDefaultInstance();
+  private RefreshNodesRequestProto.Builder builder = null;
+  private boolean viaProto = false;
   private DecommissionType decommissionType;
   private DecommissionType decommissionType;
 
 
   public RefreshNodesRequestPBImpl() {
   public RefreshNodesRequestPBImpl() {
@@ -123,6 +123,22 @@ public class RefreshNodesRequestPBImpl extends RefreshNodesRequest {
     return p.hasDecommissionTimeout()? p.getDecommissionTimeout() : null;
     return p.hasDecommissionTimeout()? p.getDecommissionTimeout() : null;
   }
   }
 
 
+  @Override
+  public synchronized String getSubClusterId() {
+    RefreshNodesRequestProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.hasSubClusterId()) ? p.getSubClusterId() : null;
+  }
+
+  @Override
+  public synchronized void setSubClusterId(String subClusterId) {
+    maybeInitBuilder();
+    if (subClusterId == null) {
+      builder.clearSubClusterId();
+      return;
+    }
+    builder.setSubClusterId(subClusterId);
+  }
+
   private DecommissionType convertFromProtoFormat(DecommissionTypeProto p) {
   private DecommissionType convertFromProtoFormat(DecommissionTypeProto p) {
     return DecommissionType.valueOf(p.name());
     return DecommissionType.valueOf(p.name());
   }
   }

+ 29 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RefreshQueuesRequestPBImpl.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshQueuesRequestProtoOrBuilder;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
 
 
 import org.apache.hadoop.thirdparty.protobuf.TextFormat;
 import org.apache.hadoop.thirdparty.protobuf.TextFormat;
@@ -29,9 +30,9 @@ import org.apache.hadoop.thirdparty.protobuf.TextFormat;
 @Unstable
 @Unstable
 public class RefreshQueuesRequestPBImpl extends RefreshQueuesRequest {
 public class RefreshQueuesRequestPBImpl extends RefreshQueuesRequest {
 
 
-  RefreshQueuesRequestProto proto = RefreshQueuesRequestProto.getDefaultInstance();
-  RefreshQueuesRequestProto.Builder builder = null;
-  boolean viaProto = false;
+  private RefreshQueuesRequestProto proto = RefreshQueuesRequestProto.getDefaultInstance();
+  private RefreshQueuesRequestProto.Builder builder = null;
+  private boolean viaProto = false;
   
   
   public RefreshQueuesRequestPBImpl() {
   public RefreshQueuesRequestPBImpl() {
     builder = RefreshQueuesRequestProto.newBuilder();
     builder = RefreshQueuesRequestProto.newBuilder();
@@ -55,8 +56,9 @@ public class RefreshQueuesRequestPBImpl extends RefreshQueuesRequest {
 
 
   @Override
   @Override
   public boolean equals(Object other) {
   public boolean equals(Object other) {
-    if (other == null)
+    if (other == null) {
       return false;
       return false;
+    }
     if (other.getClass().isAssignableFrom(this.getClass())) {
     if (other.getClass().isAssignableFrom(this.getClass())) {
       return this.getProto().equals(this.getClass().cast(other).getProto());
       return this.getProto().equals(this.getClass().cast(other).getProto());
     }
     }
@@ -67,4 +69,27 @@ public class RefreshQueuesRequestPBImpl extends RefreshQueuesRequest {
   public String toString() {
   public String toString() {
     return TextFormat.shortDebugString(getProto());
     return TextFormat.shortDebugString(getProto());
   }
   }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = RefreshQueuesRequestProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public String getSubClusterId() {
+    RefreshQueuesRequestProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.hasSubClusterId()) ? p.getSubClusterId() : null;
+  }
+
+  @Override
+  public void setSubClusterId(String clusterId) {
+    maybeInitBuilder();
+    if (clusterId == null) {
+      builder.clearSubClusterId();
+      return;
+    }
+    builder.setSubClusterId(clusterId);
+  }
 }
 }

+ 35 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java

@@ -127,6 +127,8 @@ public final class RouterMetrics {
   private MutableGaugeInt numGetRMNodeLabelsFailedRetrieved;
   private MutableGaugeInt numGetRMNodeLabelsFailedRetrieved;
   @Metric("# of checkUserAccessToQueue failed to be retrieved")
   @Metric("# of checkUserAccessToQueue failed to be retrieved")
   private MutableGaugeInt numCheckUserAccessToQueueFailedRetrieved;
   private MutableGaugeInt numCheckUserAccessToQueueFailedRetrieved;
+  @Metric("# of refreshNodes failed to be retrieved")
+  private MutableGaugeInt numRefreshNodesFailedRetrieved;
   @Metric("# of getDelegationToken failed to be retrieved")
   @Metric("# of getDelegationToken failed to be retrieved")
   private MutableGaugeInt numGetDelegationTokenFailedRetrieved;
   private MutableGaugeInt numGetDelegationTokenFailedRetrieved;
   @Metric("# of renewDelegationToken failed to be retrieved")
   @Metric("# of renewDelegationToken failed to be retrieved")
@@ -221,6 +223,8 @@ public final class RouterMetrics {
   private MutableRate totalSucceededGetRMNodeLabelsRetrieved;
   private MutableRate totalSucceededGetRMNodeLabelsRetrieved;
   @Metric("Total number of successful Retrieved CheckUserAccessToQueue and latency(ms)")
   @Metric("Total number of successful Retrieved CheckUserAccessToQueue and latency(ms)")
   private MutableRate totalSucceededCheckUserAccessToQueueRetrieved;
   private MutableRate totalSucceededCheckUserAccessToQueueRetrieved;
+  @Metric("Total number of successful Retrieved RefreshNodes and latency(ms)")
+  private MutableRate totalSucceededRefreshNodesRetrieved;
   @Metric("Total number of successful Retrieved GetDelegationToken and latency(ms)")
   @Metric("Total number of successful Retrieved GetDelegationToken and latency(ms)")
   private MutableRate totalSucceededGetDelegationTokenRetrieved;
   private MutableRate totalSucceededGetDelegationTokenRetrieved;
   @Metric("Total number of successful Retrieved RenewDelegationToken and latency(ms)")
   @Metric("Total number of successful Retrieved RenewDelegationToken and latency(ms)")
@@ -271,9 +275,10 @@ public final class RouterMetrics {
   private MutableQuantiles getUpdateQueueLatency;
   private MutableQuantiles getUpdateQueueLatency;
   private MutableQuantiles getAppTimeoutLatency;
   private MutableQuantiles getAppTimeoutLatency;
   private MutableQuantiles getAppTimeoutsLatency;
   private MutableQuantiles getAppTimeoutsLatency;
-  private MutableQuantiles getRefreshQueuesLatency;
+  private MutableQuantiles refreshQueuesLatency;
   private MutableQuantiles getRMNodeLabelsLatency;
   private MutableQuantiles getRMNodeLabelsLatency;
   private MutableQuantiles checkUserAccessToQueueLatency;
   private MutableQuantiles checkUserAccessToQueueLatency;
+  private MutableQuantiles refreshNodesLatency;
   private MutableQuantiles getDelegationTokenLatency;
   private MutableQuantiles getDelegationTokenLatency;
   private MutableQuantiles renewDelegationTokenLatency;
   private MutableQuantiles renewDelegationTokenLatency;
   private MutableQuantiles cancelDelegationTokenLatency;
   private MutableQuantiles cancelDelegationTokenLatency;
@@ -430,7 +435,7 @@ public final class RouterMetrics {
     getAppTimeoutsLatency = registry.newQuantiles("getAppTimeoutsLatency",
     getAppTimeoutsLatency = registry.newQuantiles("getAppTimeoutsLatency",
          "latency of get apptimeouts timeouts", "ops", "latency", 10);
          "latency of get apptimeouts timeouts", "ops", "latency", 10);
 
 
-    getRefreshQueuesLatency = registry.newQuantiles("getRefreshQueuesLatency",
+    refreshQueuesLatency = registry.newQuantiles("refreshQueuesLatency",
          "latency of get refresh queues timeouts", "ops", "latency", 10);
          "latency of get refresh queues timeouts", "ops", "latency", 10);
 
 
     getRMNodeLabelsLatency = registry.newQuantiles("getRMNodeLabelsLatency",
     getRMNodeLabelsLatency = registry.newQuantiles("getRMNodeLabelsLatency",
@@ -439,6 +444,9 @@ public final class RouterMetrics {
     checkUserAccessToQueueLatency = registry.newQuantiles("checkUserAccessToQueueLatency",
     checkUserAccessToQueueLatency = registry.newQuantiles("checkUserAccessToQueueLatency",
         "latency of get apptimeouts timeouts", "ops", "latency", 10);
         "latency of get apptimeouts timeouts", "ops", "latency", 10);
 
 
+    refreshNodesLatency = registry.newQuantiles("refreshNodesLatency",
+        "latency of get refresh nodes timeouts", "ops", "latency", 10);
+
     getDelegationTokenLatency = registry.newQuantiles("getDelegationTokenLatency",
     getDelegationTokenLatency = registry.newQuantiles("getDelegationTokenLatency",
         "latency of get delegation token timeouts", "ops", "latency", 10);
         "latency of get delegation token timeouts", "ops", "latency", 10);
 
 
@@ -447,6 +455,7 @@ public final class RouterMetrics {
 
 
     cancelDelegationTokenLatency = registry.newQuantiles("cancelDelegationTokenLatency",
     cancelDelegationTokenLatency = registry.newQuantiles("cancelDelegationTokenLatency",
         "latency of cancel delegation token timeouts", "ops", "latency", 10);
         "latency of cancel delegation token timeouts", "ops", "latency", 10);
+
   }
   }
 
 
   public static RouterMetrics getMetrics() {
   public static RouterMetrics getMetrics() {
@@ -673,6 +682,11 @@ public final class RouterMetrics {
     return totalSucceededRefreshQueuesRetrieved.lastStat().numSamples();
     return totalSucceededRefreshQueuesRetrieved.lastStat().numSamples();
   }
   }
 
 
+  @VisibleForTesting
+  public long getNumSucceededRefreshNodesRetrieved() {
+    return totalSucceededRefreshNodesRetrieved.lastStat().numSamples();
+  }
+
   @VisibleForTesting
   @VisibleForTesting
   public long getNumSucceededGetRMNodeLabelsRetrieved() {
   public long getNumSucceededGetRMNodeLabelsRetrieved() {
     return totalSucceededGetRMNodeLabelsRetrieved.lastStat().numSamples();
     return totalSucceededGetRMNodeLabelsRetrieved.lastStat().numSamples();
@@ -903,6 +917,11 @@ public final class RouterMetrics {
     return totalSucceededRefreshQueuesRetrieved.lastStat().mean();
     return totalSucceededRefreshQueuesRetrieved.lastStat().mean();
   }
   }
 
 
+  @VisibleForTesting
+  public double getLatencySucceededRefreshNodesRetrieved() {
+    return totalSucceededRefreshNodesRetrieved.lastStat().mean();
+  }
+
   @VisibleForTesting
   @VisibleForTesting
   public double getLatencySucceededGetRMNodeLabelsRetrieved() {
   public double getLatencySucceededGetRMNodeLabelsRetrieved() {
     return totalSucceededGetRMNodeLabelsRetrieved.lastStat().mean();
     return totalSucceededGetRMNodeLabelsRetrieved.lastStat().mean();
@@ -1122,6 +1141,10 @@ public final class RouterMetrics {
     return numCheckUserAccessToQueueFailedRetrieved.value();
     return numCheckUserAccessToQueueFailedRetrieved.value();
   }
   }
 
 
+  public int getNumRefreshNodesFailedRetrieved() {
+    return numRefreshNodesFailedRetrieved.value();
+  }
+
   public int getDelegationTokenFailedRetrieved() {
   public int getDelegationTokenFailedRetrieved() {
     return numGetDelegationTokenFailedRetrieved.value();
     return numGetDelegationTokenFailedRetrieved.value();
   }
   }
@@ -1336,7 +1359,12 @@ public final class RouterMetrics {
 
 
   public void succeededRefreshQueuesRetrieved(long duration) {
   public void succeededRefreshQueuesRetrieved(long duration) {
     totalSucceededRefreshQueuesRetrieved.add(duration);
     totalSucceededRefreshQueuesRetrieved.add(duration);
-    getRefreshQueuesLatency.add(duration);
+    refreshQueuesLatency.add(duration);
+  }
+
+  public void succeededRefreshNodesRetrieved(long duration) {
+    totalSucceededRefreshNodesRetrieved.add(duration);
+    refreshNodesLatency.add(duration);
   }
   }
 
 
   public void succeededGetRMNodeLabelsRetrieved(long duration) {
   public void succeededGetRMNodeLabelsRetrieved(long duration) {
@@ -1536,6 +1564,10 @@ public final class RouterMetrics {
     numCheckUserAccessToQueueFailedRetrieved.incr();
     numCheckUserAccessToQueueFailedRetrieved.incr();
   }
   }
 
 
+  public void incrRefreshNodesFailedRetrieved() {
+    numRefreshNodesFailedRetrieved.incr();
+  }
+
   public void incrGetDelegationTokenFailedRetrieved() {
   public void incrGetDelegationTokenFailedRetrieved() {
     numGetDelegationTokenFailedRetrieved.incr();
     numGetDelegationTokenFailedRetrieved.incr();
   }
   }

+ 69 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java

@@ -145,6 +145,23 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
        + "is correct");
        + "is correct");
   }
   }
 
 
+  /**
+   * Refresh queue requests.
+   *
+   * The Router supports refreshing all SubCluster queues at once,
+   * and also supports refreshing queues by SubCluster.
+   *
+   * @param request RefreshQueuesRequest, If subClusterId is not empty,
+   * it means that we want to refresh the queue of the specified subClusterId.
+   * If subClusterId is empty, it means we want to refresh all queues.
+   *
+   * @return RefreshQueuesResponse, There is no specific information in the response,
+   * as long as it is not empty, it means that the request is successful.
+   *
+   * @throws StandbyException exception thrown by non-active server.
+   * @throws YarnException indicates exceptions from yarn servers.
+   * @throws IOException io error occurs.
+   */
   @Override
   @Override
   public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
   public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
       throws StandbyException, YarnException, IOException {
       throws StandbyException, YarnException, IOException {
@@ -161,8 +178,9 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
       RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
       RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
            new Class[] {RefreshQueuesRequest.class}, new Object[] {request});
            new Class[] {RefreshQueuesRequest.class}, new Object[] {request});
 
 
+      String subClusterId = request.getSubClusterId();
       Collection<RefreshQueuesResponse> refreshQueueResps =
       Collection<RefreshQueuesResponse> refreshQueueResps =
-          remoteMethod.invokeConcurrent(this, RefreshQueuesResponse.class);
+          remoteMethod.invokeConcurrent(this, RefreshQueuesResponse.class, subClusterId);
 
 
       // If we get the return result from refreshQueueResps,
       // If we get the return result from refreshQueueResps,
       // it means that the call has been successful,
       // it means that the call has been successful,
@@ -172,19 +190,66 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
         routerMetrics.succeededRefreshQueuesRetrieved(stopTime - startTime);
         routerMetrics.succeededRefreshQueuesRetrieved(stopTime - startTime);
         return RefreshQueuesResponse.newInstance();
         return RefreshQueuesResponse.newInstance();
       }
       }
-    } catch (Exception e) {
+    } catch (YarnException e) {
       routerMetrics.incrRefreshQueuesFailedRetrieved();
       routerMetrics.incrRefreshQueuesFailedRetrieved();
-      RouterServerUtil.logAndThrowException("Unable to refreshQueue to exception.", e);
+      RouterServerUtil.logAndThrowException(e, "Unable to refreshQueue due to exception.");
     }
     }
 
 
     routerMetrics.incrRefreshQueuesFailedRetrieved();
     routerMetrics.incrRefreshQueuesFailedRetrieved();
     throw new YarnException("Unable to refreshQueue.");
     throw new YarnException("Unable to refreshQueue.");
   }
   }
 
 
+  /**
+   * Refresh node requests.
+   *
+   * The Router supports refreshing all SubCluster nodes at once,
+   * and also supports refreshing node by SubCluster.
+   *
+   * @param request RefreshNodesRequest, If subClusterId is not empty,
+   * it means that we want to refresh the node of the specified subClusterId.
+   * If subClusterId is empty, it means we want to refresh all nodes.
+   *
+   * @return RefreshNodesResponse, There is no specific information in the response,
+   * as long as it is not empty, it means that the request is successful.
+   *
+   * @throws StandbyException exception thrown by non-active server.
+   * @throws YarnException indicates exceptions from yarn servers.
+   * @throws IOException io error occurs.
+   */
   @Override
   @Override
   public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
   public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
       throws StandbyException, YarnException, IOException {
       throws StandbyException, YarnException, IOException {
-    throw new NotImplementedException();
+
+    // parameter verification.
+    // We will not check whether the DecommissionType is empty,
+    // because this parameter has a default value at the proto level.
+    if (request == null) {
+      routerMetrics.incrRefreshNodesFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Missing RefreshNodes request.", null);
+    }
+
+    // call refreshNodes of activeSubClusters.
+    try {
+      long startTime = clock.getTime();
+      RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
+          new Class[] {RefreshNodesRequest.class}, new Object[] {request});
+
+      String subClusterId = request.getSubClusterId();
+      Collection<RefreshNodesResponse> refreshNodesResps =
+          remoteMethod.invokeConcurrent(this, RefreshNodesResponse.class, subClusterId);
+
+      if (CollectionUtils.isNotEmpty(refreshNodesResps)) {
+        long stopTime = clock.getTime();
+        routerMetrics.succeededRefreshNodesRetrieved(stopTime - startTime);
+        return RefreshNodesResponse.newInstance();
+      }
+    } catch (YarnException e) {
+      routerMetrics.incrRefreshNodesFailedRetrieved();
+      RouterServerUtil.logAndThrowException(e, "Unable to refreshNodes due to exception.");
+    }
+
+    routerMetrics.incrRefreshNodesFailedRetrieved();
+    throw new YarnException("Unable to refreshNodes.");
   }
   }
 
 
   @Override
   @Override

+ 58 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminProtocolMethod.java

@@ -37,12 +37,12 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeMap;
 import java.util.List;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
 
 
-
 /**
 /**
  * Class to define admin method, params and arguments.
  * Class to define admin method, params and arguments.
  */
  */
@@ -61,11 +61,15 @@ public class RMAdminProtocolMethod extends FederationMethodWrapper {
   }
   }
 
 
   public <R> Collection<R> invokeConcurrent(FederationRMAdminInterceptor interceptor,
   public <R> Collection<R> invokeConcurrent(FederationRMAdminInterceptor interceptor,
-      Class<R> clazz) throws YarnException {
+      Class<R> clazz, String subClusterId) throws YarnException {
     this.rmAdminInterceptor = interceptor;
     this.rmAdminInterceptor = interceptor;
     this.federationFacade = FederationStateStoreFacade.getInstance();
     this.federationFacade = FederationStateStoreFacade.getInstance();
     this.configuration = interceptor.getConf();
     this.configuration = interceptor.getConf();
-    return invokeConcurrent(clazz);
+    if (StringUtils.isNotBlank(subClusterId)) {
+      return invoke(clazz, subClusterId);
+    } else {
+      return invokeConcurrent(clazz);
+    }
   }
   }
 
 
   @Override
   @Override
@@ -107,7 +111,10 @@ public class RMAdminProtocolMethod extends FederationMethodWrapper {
           Pair<SubClusterId, Object> pair = future.get();
           Pair<SubClusterId, Object> pair = future.get();
           subClusterId = pair.getKey();
           subClusterId = pair.getKey();
           Object result = pair.getValue();
           Object result = pair.getValue();
-          results.put(subClusterId, clazz.cast(result));
+          if (result != null) {
+            R rResult = clazz.cast(result);
+            results.put(subClusterId, rResult);
+          }
         } catch (InterruptedException | ExecutionException e) {
         } catch (InterruptedException | ExecutionException e) {
           Throwable cause = e.getCause();
           Throwable cause = e.getCause();
           LOG.error("Cannot execute {} on {}: {}", methodName, subClusterId, cause.getMessage());
           LOG.error("Cannot execute {} on {}: {}", methodName, subClusterId, cause.getMessage());
@@ -129,4 +136,51 @@ public class RMAdminProtocolMethod extends FederationMethodWrapper {
     // return result
     // return result
     return results.values();
     return results.values();
   }
   }
+
+  /**
+   * Call the method in the protocol according to the subClusterId.
+   *
+   * @param clazz return type
+   * @param subClusterId subCluster Id
+   * @param <R> Generic R
+   * @return response collection.
+   * @throws YarnException yarn exception.
+   */
+  protected <R> Collection<R> invoke(Class<R> clazz, String subClusterId) throws YarnException {
+
+    // Get the method name to call
+    String methodName = Thread.currentThread().getStackTrace()[3].getMethodName();
+    this.setMethodName(methodName);
+
+    // Get Active SubClusters
+    Map<SubClusterId, SubClusterInfo> subClusterInfoMap =
+        federationFacade.getSubClusters(true);
+
+    // According to subCluster of string type, convert to SubClusterId type
+    SubClusterId subClusterIdKey = SubClusterId.newInstance(subClusterId);
+
+    // If the provided subCluster is not Active or does not exist,
+    // an exception will be returned directly.
+    if (!subClusterInfoMap.containsKey(subClusterIdKey)) {
+      throw new YarnException("subClusterId = " + subClusterId + " is not an active subCluster.");
+    }
+
+    // Call the method in the protocol and convert it according to clazz.
+    try {
+      ResourceManagerAdministrationProtocol protocol =
+          rmAdminInterceptor.getAdminRMProxyForSubCluster(subClusterIdKey);
+      Class<?>[] types = this.getTypes();
+      Object[] params = this.getParams();
+      Method method = ResourceManagerAdministrationProtocol.class.getMethod(methodName, types);
+      Object result = method.invoke(protocol, params);
+      if (result != null) {
+        return Collections.singletonList(clazz.cast(result));
+      }
+    } catch (Exception e) {
+      throw new YarnException("invoke Failed, An exception occurred in subClusterId = " +
+          subClusterId, e);
+    }
+    throw new YarnException("invoke Failed, An exception occurred in subClusterId = " +
+        subClusterId);
+  }
 }
 }

+ 67 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java

@@ -19,8 +19,11 @@
 package org.apache.hadoop.yarn.server.router.rmadmin;
 package org.apache.hadoop.yarn.server.router.rmadmin;
 
 
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.yarn.api.records.DecommissionType;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
 import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -31,7 +34,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 
 
@@ -77,7 +79,7 @@ public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest {
     subClusters = new ArrayList<>();
     subClusters = new ArrayList<>();
     try {
     try {
       for (int i = 0; i < NUM_SUBCLUSTER; i++) {
       for (int i = 0; i < NUM_SUBCLUSTER; i++) {
-        SubClusterId sc = SubClusterId.newInstance(Integer.toString(i));
+        SubClusterId sc = SubClusterId.newInstance("SC-" + i);
         stateStoreUtil.registerSubCluster(sc);
         stateStoreUtil.registerSubCluster(sc);
         subClusters.add(sc);
         subClusters.add(sc);
       }
       }
@@ -114,8 +116,70 @@ public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest {
   }
   }
 
 
   @Test
   @Test
-  public void testRefreshQueues() throws IOException, YarnException {
+  public void testRefreshQueues() throws Exception {
+    // We will test 2 cases:
+    // case 1, request is null.
+    // case 2, normal request.
+    // If the request is null, a Missing RefreshQueues request exception will be thrown.
+
+    // null request.
+    LambdaTestUtils.intercept(YarnException.class, "Missing RefreshQueues request.",
+        () -> interceptor.refreshQueues(null));
+
+    // normal request.
     RefreshQueuesRequest request = RefreshQueuesRequest.newInstance();
     RefreshQueuesRequest request = RefreshQueuesRequest.newInstance();
     interceptor.refreshQueues(request);
     interceptor.refreshQueues(request);
   }
   }
+
+  @Test
+  public void testSC1RefreshQueues() throws Exception {
+    // We will test 2 cases:
+    // case 1, test the existing subCluster (SC-1).
+    // case 2, test the non-exist subCluster.
+
+    String existSubCluster = "SC-1";
+    RefreshQueuesRequest request = RefreshQueuesRequest.newInstance(existSubCluster);
+    interceptor.refreshQueues(request);
+
+    String notExistsSubCluster = "SC-NON";
+    RefreshQueuesRequest request1 = RefreshQueuesRequest.newInstance(notExistsSubCluster);
+    LambdaTestUtils.intercept(YarnException.class,
+        "subClusterId = SC-NON is not an active subCluster.",
+        () -> interceptor.refreshQueues(request1));
+  }
+
+  @Test
+  public void testRefreshNodes() throws Exception {
+    // We will test 2 cases:
+    // case 1, request is null.
+    // case 2, normal request.
+    // If the request is null, a Missing RefreshNodes request exception will be thrown.
+
+    // null request.
+    LambdaTestUtils.intercept(YarnException.class,
+        "Missing RefreshNodes request.", () -> interceptor.refreshNodes(null));
+
+    // normal request.
+    RefreshNodesRequest request = RefreshNodesRequest.newInstance(DecommissionType.NORMAL);
+    interceptor.refreshNodes(request);
+  }
+
+  @Test
+  public void testSC1RefreshNodes() throws Exception {
+
+    // We will test 2 cases:
+    // case 1, test the existing subCluster (SC-1).
+    // case 2, test the non-exist subCluster.
+
+    RefreshNodesRequest request =
+        RefreshNodesRequest.newInstance(DecommissionType.NORMAL, 10, "SC-1");
+    interceptor.refreshNodes(request);
+
+    String notExistsSubCluster = "SC-NON";
+    RefreshNodesRequest request1 = RefreshNodesRequest.newInstance(
+        DecommissionType.NORMAL, 10, notExistsSubCluster);
+    LambdaTestUtils.intercept(YarnException.class,
+        "subClusterId = SC-NON is not an active subCluster.",
+        () -> interceptor.refreshNodes(request1));
+  }
 }
 }