Browse Source

YARN-11218. [Federation] Add getActivities, getBulkActivities REST APIs for Router. (#5284)

slfan1989 2 years ago
parent
commit
468135a4d9

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/BulkActivitiesInfo.java

@@ -34,6 +34,8 @@ public class BulkActivitiesInfo {
 
   private ArrayList<ActivitiesInfo> activities = new ArrayList<>();
 
+  private String subClusterId;
+
   public BulkActivitiesInfo() {
     // JAXB needs this
   }
@@ -49,4 +51,12 @@ public class BulkActivitiesInfo {
   public void addAll(List<ActivitiesInfo> activitiesInfoList) {
     activities.addAll(activitiesInfoList);
   }
+
+  public String getSubClusterId() {
+    return subClusterId;
+  }
+
+  public void setSubClusterId(String subClusterId) {
+    this.subClusterId = subClusterId;
+  }
 }

+ 62 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java

@@ -135,6 +135,10 @@ public final class RouterMetrics {
   private MutableGaugeInt numRenewDelegationTokenFailedRetrieved;
   @Metric("# of renewDelegationToken failed to be retrieved")
   private MutableGaugeInt numCancelDelegationTokenFailedRetrieved;
+  @Metric("# of getActivities failed to be retrieved")
+  private MutableGaugeInt numGetActivitiesFailedRetrieved;
+  @Metric("# of getBulkActivities failed to be retrieved")
+  private MutableGaugeInt numGetBulkActivitiesFailedRetrieved;
   @Metric("# of getSchedulerInfo failed to be retrieved")
   private MutableGaugeInt numGetSchedulerInfoFailedRetrieved;
   @Metric("# of refreshSuperUserGroupsConfiguration failed to be retrieved")
@@ -237,6 +241,10 @@ public final class RouterMetrics {
   private MutableRate totalSucceededRenewDelegationTokenRetrieved;
   @Metric("Total number of successful Retrieved CancelDelegationToken and latency(ms)")
   private MutableRate totalSucceededCancelDelegationTokenRetrieved;
+  @Metric("Total number of successful Retrieved GetActivities and latency(ms)")
+  private MutableRate totalSucceededGetActivitiesRetrieved;
+  @Metric("Total number of successful Retrieved GetBulkActivities and latency(ms)")
+  private MutableRate totalSucceededGetBulkActivitiesRetrieved;
   @Metric("Total number of successful Retrieved RefreshSuperUserGroupsConfig and latency(ms)")
   private MutableRate totalSucceededRefreshSuperUserGroupsConfigurationRetrieved;
   @Metric("Total number of successful Retrieved RefreshUserToGroupsMappings and latency(ms)")
@@ -295,6 +303,8 @@ public final class RouterMetrics {
   private MutableQuantiles getDelegationTokenLatency;
   private MutableQuantiles renewDelegationTokenLatency;
   private MutableQuantiles cancelDelegationTokenLatency;
+  private MutableQuantiles getActivitiesLatency;
+  private MutableQuantiles getBulkActivitiesLatency;
   private MutableQuantiles getSchedulerInfoRetrievedLatency;
   private MutableQuantiles refreshSuperUserGroupsConfLatency;
   private MutableQuantiles refreshUserToGroupsMappingsLatency;
@@ -472,6 +482,12 @@ public final class RouterMetrics {
     cancelDelegationTokenLatency = registry.newQuantiles("cancelDelegationTokenLatency",
         "latency of cancel delegation token timeouts", "ops", "latency", 10);
 
+    getActivitiesLatency = registry.newQuantiles("getActivitiesLatency",
+        "latency of get activities timeouts", "ops", "latency", 10);
+
+    getBulkActivitiesLatency = registry.newQuantiles("getBulkActivitiesLatency",
+         "latency of get bulk activities timeouts", "ops", "latency", 10);
+
     getSchedulerInfoRetrievedLatency = registry.newQuantiles("getSchedulerInfoRetrievedLatency",
         "latency of get scheduler info timeouts", "ops", "latency", 10);
 
@@ -736,6 +752,16 @@ public final class RouterMetrics {
     return totalSucceededCancelDelegationTokenRetrieved.lastStat().numSamples();
   }
 
+  @VisibleForTesting
+  public long getNumSucceededGetActivitiesRetrieved() {
+    return totalSucceededGetActivitiesRetrieved.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  public long getNumSucceededGetBulkActivitiesRetrieved() {
+    return totalSucceededGetBulkActivitiesRetrieved.lastStat().numSamples();
+  }
+
   @VisibleForTesting
   public long getNumSucceededGetSchedulerInfoRetrieved() {
     return totalSucceededGetSchedulerInfoRetrieved.lastStat().numSamples();
@@ -981,6 +1007,16 @@ public final class RouterMetrics {
     return totalSucceededCancelDelegationTokenRetrieved.lastStat().mean();
   }
 
+  @VisibleForTesting
+  public double getLatencySucceededGetActivitiesRetrieved() {
+    return totalSucceededGetActivitiesRetrieved.lastStat().mean();
+  }
+
+  @VisibleForTesting
+  public double getLatencySucceededGetBulkActivitiesRetrieved() {
+    return totalSucceededGetBulkActivitiesRetrieved.lastStat().mean();
+  }
+
   @VisibleForTesting
   public double getLatencySucceededGetSchedulerInfoRetrieved() {
     return totalSucceededGetSchedulerInfoRetrieved.lastStat().mean();
@@ -1209,6 +1245,14 @@ public final class RouterMetrics {
     return numCancelDelegationTokenFailedRetrieved.value();
   }
 
+  public int getActivitiesFailedRetrieved() {
+    return numGetActivitiesFailedRetrieved.value();
+  }
+
+  public int getBulkActivitiesFailedRetrieved(){
+    return numGetBulkActivitiesFailedRetrieved.value();
+  }
+
   public int getSchedulerInfoFailedRetrieved() {
     return numGetSchedulerInfoFailedRetrieved.value();
   }
@@ -1448,6 +1492,16 @@ public final class RouterMetrics {
     cancelDelegationTokenLatency.add(duration);
   }
 
+  public void succeededGetActivitiesLatencyRetrieved(long duration) {
+    totalSucceededGetActivitiesRetrieved.add(duration);
+    getActivitiesLatency.add(duration);
+  }
+
+  public void succeededGetBulkActivitiesRetrieved(long duration) {
+    totalSucceededGetBulkActivitiesRetrieved.add(duration);
+    getBulkActivitiesLatency.add(duration);
+  }
+
   public void succeededGetSchedulerInfoRetrieved(long duration) {
     totalSucceededGetSchedulerInfoRetrieved.add(duration);
     getSchedulerInfoRetrievedLatency.add(duration);
@@ -1659,6 +1713,14 @@ public final class RouterMetrics {
     numCancelDelegationTokenFailedRetrieved.incr();
   }
 
+  public void incrGetActivitiesFailedRetrieved() {
+    numGetActivitiesFailedRetrieved.incr();
+  }
+
+  public void incrGetBulkActivitiesFailedRetrieved() {
+    numGetBulkActivitiesFailedRetrieved.incr();
+  }
+
   public void incrGetSchedulerInfoFailedRetrieved() {
     numGetSchedulerInfoFailedRetrieved.incr();
   }

+ 98 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java

@@ -46,6 +46,7 @@ import javax.ws.rs.core.Response.Status;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.impl.prefetch.Validate;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
@@ -121,6 +122,7 @@ import org.apache.hadoop.yarn.server.router.clientrm.ClientMethod;
 import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
 import org.apache.hadoop.yarn.server.router.security.RouterDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.router.webapp.cache.RouterAppInfoCacheKey;
+import org.apache.hadoop.yarn.server.router.webapp.dao.FederationBulkActivitiesInfo;
 import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
 import org.apache.hadoop.yarn.server.router.webapp.dao.SubClusterResult;
 import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo;
@@ -1187,16 +1189,110 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
     throw new NotImplementedException("Code is not implemented");
   }
 
+  /**
+   * This method retrieve all the activities in a specific node, and it is
+   * reachable by using {@link RMWSConsts#SCHEDULER_ACTIVITIES}.
+   *
+   * @param hsr the servlet request
+   * @param nodeId the node we want to retrieve the activities. It is a
+   *          QueryParam.
+   * @param groupBy the groupBy type by which the activities should be
+   *          aggregated. It is a QueryParam.
+   * @return all the activities in the specific node
+   */
   @Override
   public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId,
       String groupBy) {
-    throw new NotImplementedException("Code is not implemented");
+    try {
+      // Check the parameters to ensure that the parameters are not empty
+      Validate.checkNotNullAndNotEmpty(nodeId, "nodeId");
+      Validate.checkNotNullAndNotEmpty(groupBy, "groupBy");
+
+      // Query SubClusterInfo according to id,
+      // if the nodeId cannot get SubClusterInfo, an exception will be thrown directly.
+      SubClusterInfo subClusterInfo = getNodeSubcluster(nodeId);
+
+      // Call the corresponding subCluster to get ActivitiesInfo.
+      long startTime = clock.getTime();
+      DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
+          subClusterInfo.getSubClusterId(), subClusterInfo.getRMWebServiceAddress());
+      final HttpServletRequest hsrCopy = clone(hsr);
+      ActivitiesInfo activitiesInfo = interceptor.getActivities(hsrCopy, nodeId, groupBy);
+      if (activitiesInfo != null) {
+        long stopTime = clock.getTime();
+        routerMetrics.succeededGetActivitiesLatencyRetrieved(stopTime - startTime);
+        return activitiesInfo;
+      }
+    } catch (IllegalArgumentException e) {
+      routerMetrics.incrGetActivitiesFailedRetrieved();
+      throw e;
+    } catch (NotFoundException e) {
+      routerMetrics.incrGetActivitiesFailedRetrieved();
+      throw e;
+    }
+
+    routerMetrics.incrGetActivitiesFailedRetrieved();
+    throw new RuntimeException("getActivities Failed.");
   }
 
+  /**
+   * This method retrieve the last n activities inside scheduler, and it is
+   * reachable by using {@link RMWSConsts#SCHEDULER_BULK_ACTIVITIES}.
+   *
+   * @param hsr the servlet request
+   * @param groupBy the groupBy type by which the activities should be
+   *        aggregated. It is a QueryParam.
+   * @param activitiesCount number of activities
+   * @return last n activities
+   */
   @Override
   public BulkActivitiesInfo getBulkActivities(HttpServletRequest hsr,
       String groupBy, int activitiesCount) throws InterruptedException {
-    throw new NotImplementedException("Code is not implemented");
+    try {
+      // Step1. Check the parameters to ensure that the parameters are not empty
+      Validate.checkNotNullAndNotEmpty(groupBy, "groupBy");
+      Validate.checkNotNegative(activitiesCount, "activitiesCount");
+
+      // Step2. Call the interface of subCluster concurrently and get the returned result.
+      Map<SubClusterId, SubClusterInfo> subClustersActive = getActiveSubclusters();
+      final HttpServletRequest hsrCopy = clone(hsr);
+      Class[] argsClasses = new Class[]{HttpServletRequest.class, String.class, int.class};
+      Object[] args = new Object[]{hsrCopy, groupBy, activitiesCount};
+      ClientMethod remoteMethod = new ClientMethod("getBulkActivities", argsClasses, args);
+      Map<SubClusterInfo, BulkActivitiesInfo> appStatisticsMap = invokeConcurrent(
+          subClustersActive.values(), remoteMethod, BulkActivitiesInfo.class);
+
+      // Step3. Generate Federation objects and set subCluster information.
+      long startTime = clock.getTime();
+      FederationBulkActivitiesInfo fedBulkActivitiesInfo = new FederationBulkActivitiesInfo();
+      appStatisticsMap.forEach((subClusterInfo, bulkActivitiesInfo) -> {
+        SubClusterId subClusterId = subClusterInfo.getSubClusterId();
+        bulkActivitiesInfo.setSubClusterId(subClusterId.getId());
+        fedBulkActivitiesInfo.getList().add(bulkActivitiesInfo);
+      });
+      long stopTime = clock.getTime();
+      routerMetrics.succeededGetBulkActivitiesRetrieved(stopTime - startTime);
+      return fedBulkActivitiesInfo;
+    } catch (IllegalArgumentException e) {
+      routerMetrics.incrGetBulkActivitiesFailedRetrieved();
+      throw e;
+    } catch (NotFoundException e) {
+      routerMetrics.incrGetBulkActivitiesFailedRetrieved();
+      RouterServerUtil.logAndThrowRunTimeException("get all active sub cluster(s) error.", e);
+    } catch (IOException e) {
+      routerMetrics.incrGetBulkActivitiesFailedRetrieved();
+      RouterServerUtil.logAndThrowRunTimeException(e,
+          "getBulkActivities by groupBy = %s, activitiesCount = %s with io error.",
+          groupBy, String.valueOf(activitiesCount));
+    } catch (YarnException e) {
+      routerMetrics.incrGetBulkActivitiesFailedRetrieved();
+      RouterServerUtil.logAndThrowRunTimeException(e,
+          "getBulkActivities by groupBy = %s, activitiesCount = %s with yarn error.",
+          groupBy, String.valueOf(activitiesCount));
+    }
+
+    routerMetrics.incrGetBulkActivitiesFailedRetrieved();
+    throw new RuntimeException("getBulkActivities Failed.");
   }
 
   @Override

+ 49 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/dao/FederationBulkActivitiesInfo.java

@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router.webapp.dao;
+
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class FederationBulkActivitiesInfo extends BulkActivitiesInfo {
+
+  @XmlElement(name = "subCluster")
+  private ArrayList<BulkActivitiesInfo> list = new ArrayList<>();
+
+  public FederationBulkActivitiesInfo() {
+  } // JAXB needs this
+
+  public FederationBulkActivitiesInfo(ArrayList<BulkActivitiesInfo> list) {
+    this.list = list;
+  }
+
+  public ArrayList<BulkActivitiesInfo> getList() {
+    return list;
+  }
+
+  public void setList(ArrayList<BulkActivitiesInfo> list) {
+    this.list = list;
+  }
+}

+ 66 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java

@@ -533,6 +533,16 @@ public class TestRouterMetrics {
       LOG.info("Mocked: failed renewDelegationToken call");
       metrics.incrRenewDelegationTokenFailedRetrieved();
     }
+
+    public void getActivitiesFailed() {
+      LOG.info("Mocked: failed getBulkActivitie call");
+      metrics.incrGetActivitiesFailedRetrieved();
+    }
+
+    public void getBulkActivitiesFailed() {
+      LOG.info("Mocked: failed getBulkActivitie call");
+      metrics.incrGetBulkActivitiesFailedRetrieved();
+    }
   }
 
   // Records successes for all calls
@@ -763,6 +773,16 @@ public class TestRouterMetrics {
       LOG.info("Mocked: successful RenewDelegationToken call with duration {}", duration);
       metrics.succeededRenewDelegationTokenRetrieved(duration);
     }
+
+    public void getActivitiesRetrieved(long duration) {
+      LOG.info("Mocked: successful GetActivities call with duration {}", duration);
+      metrics.succeededGetActivitiesLatencyRetrieved(duration);
+    }
+
+    public void getBulkActivitiesRetrieved(long duration) {
+      LOG.info("Mocked: successful GetBulkActivities call with duration {}", duration);
+      metrics.succeededGetBulkActivitiesRetrieved(duration);
+    }
   }
 
   @Test
@@ -1597,4 +1617,50 @@ public class TestRouterMetrics {
     Assert.assertEquals(totalBadBefore + 1,
         metrics.getRenewDelegationTokenFailedRetrieved());
   }
+
+  @Test
+  public void testGetActivitiesRetrieved() {
+    long totalGoodBefore = metrics.getNumSucceededGetActivitiesRetrieved();
+    goodSubCluster.getActivitiesRetrieved(150);
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededGetActivitiesRetrieved());
+    Assert.assertEquals(150,
+        metrics.getLatencySucceededGetActivitiesRetrieved(), ASSERT_DOUBLE_DELTA);
+    goodSubCluster.getActivitiesRetrieved(300);
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededGetActivitiesRetrieved());
+    Assert.assertEquals(225,
+        metrics.getLatencySucceededGetActivitiesRetrieved(), ASSERT_DOUBLE_DELTA);
+  }
+
+  @Test
+  public void testGetActivitiesRetrievedFailed() {
+    long totalBadBefore = metrics.getActivitiesFailedRetrieved();
+    badSubCluster.getActivitiesFailed();
+    Assert.assertEquals(totalBadBefore + 1,
+        metrics.getActivitiesFailedRetrieved());
+  }
+
+  @Test
+  public void testGetBulkActivitiesRetrieved() {
+    long totalGoodBefore = metrics.getNumSucceededGetBulkActivitiesRetrieved();
+    goodSubCluster.getBulkActivitiesRetrieved(150);
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededGetBulkActivitiesRetrieved());
+    Assert.assertEquals(150,
+        metrics.getLatencySucceededGetBulkActivitiesRetrieved(), ASSERT_DOUBLE_DELTA);
+    goodSubCluster.getBulkActivitiesRetrieved(300);
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededGetBulkActivitiesRetrieved());
+    Assert.assertEquals(225,
+        metrics.getLatencySucceededGetBulkActivitiesRetrieved(), ASSERT_DOUBLE_DELTA);
+  }
+
+  @Test
+  public void testGetBulkActivitiesRetrievedFailed() {
+    long totalBadBefore = metrics.getBulkActivitiesFailedRetrieved();
+    badSubCluster.getBulkActivitiesFailed();
+    Assert.assertEquals(totalBadBefore + 1,
+        metrics.getBulkActivitiesFailedRetrieved());
+  }
 }

+ 63 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java

@@ -137,6 +137,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationReque
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestsInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateResponseInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteResponseInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeAllocationInfo;
 import org.apache.hadoop.yarn.server.router.RouterServerUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.PartitionInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.RMQueueAclInfo;
@@ -1213,6 +1217,65 @@ public class MockDefaultRequestInterceptorREST
   }
 
   @Override
+  public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId, String groupBy) {
+    if (!EnumUtils.isValidEnum(RMWSConsts.ActivitiesGroupBy.class, groupBy.toUpperCase())) {
+      String errMessage = "Got invalid groupBy: " + groupBy + ", valid groupBy types: "
+          + Arrays.asList(RMWSConsts.ActivitiesGroupBy.values());
+      throw new IllegalArgumentException(errMessage);
+    }
+
+    SubClusterId subClusterId = getSubClusterId();
+    ActivitiesInfo activitiesInfo = mock(ActivitiesInfo.class);
+    Mockito.when(activitiesInfo.getNodeId()).thenReturn(nodeId);
+    Mockito.when(activitiesInfo.getTimestamp()).thenReturn(1673081972L);
+    Mockito.when(activitiesInfo.getDiagnostic()).thenReturn("Diagnostic:" + subClusterId.getId());
+
+    List<NodeAllocationInfo> allocationInfos = new ArrayList<>();
+    NodeAllocationInfo nodeAllocationInfo = mock(NodeAllocationInfo.class);
+    Mockito.when(nodeAllocationInfo.getPartition()).thenReturn("p" + subClusterId.getId());
+    Mockito.when(nodeAllocationInfo.getFinalAllocationState()).thenReturn("ALLOCATED");
+
+    allocationInfos.add(nodeAllocationInfo);
+    Mockito.when(activitiesInfo.getAllocations()).thenReturn(allocationInfos);
+    return activitiesInfo;
+  }
+
+  @Override
+  public BulkActivitiesInfo getBulkActivities(HttpServletRequest hsr,
+      String groupBy, int activitiesCount) {
+
+    if (activitiesCount <= 0) {
+      throw new IllegalArgumentException("activitiesCount needs to be greater than 0.");
+    }
+
+    if (!EnumUtils.isValidEnum(RMWSConsts.ActivitiesGroupBy.class, groupBy.toUpperCase())) {
+      String errMessage = "Got invalid groupBy: " + groupBy + ", valid groupBy types: "
+          + Arrays.asList(RMWSConsts.ActivitiesGroupBy.values());
+      throw new IllegalArgumentException(errMessage);
+    }
+
+    BulkActivitiesInfo bulkActivitiesInfo = new BulkActivitiesInfo();
+
+    for (int i = 0; i < activitiesCount; i++) {
+      SubClusterId subClusterId = getSubClusterId();
+      ActivitiesInfo activitiesInfo = mock(ActivitiesInfo.class);
+      Mockito.when(activitiesInfo.getNodeId()).thenReturn(subClusterId + "-nodeId-" + i);
+      Mockito.when(activitiesInfo.getTimestamp()).thenReturn(1673081972L);
+      Mockito.when(activitiesInfo.getDiagnostic()).thenReturn("Diagnostic:" + subClusterId.getId());
+
+      List<NodeAllocationInfo> allocationInfos = new ArrayList<>();
+      NodeAllocationInfo nodeAllocationInfo = mock(NodeAllocationInfo.class);
+      Mockito.when(nodeAllocationInfo.getPartition()).thenReturn("p" + subClusterId.getId());
+      Mockito.when(nodeAllocationInfo.getFinalAllocationState()).thenReturn("ALLOCATED");
+
+      allocationInfos.add(nodeAllocationInfo);
+      Mockito.when(activitiesInfo.getAllocations()).thenReturn(allocationInfos);
+      bulkActivitiesInfo.getActivities().add(activitiesInfo);
+    }
+
+    return bulkActivitiesInfo;
+  }
+
   public SchedulerTypeInfo getSchedulerInfo() {
     try {
       ResourceManager resourceManager = CapacitySchedulerTestUtilities.createResourceManager();

+ 86 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java

@@ -34,6 +34,7 @@ import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -115,9 +116,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedule
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeAllocationInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
 import org.apache.hadoop.yarn.server.router.webapp.dao.FederationRMQueueAclInfo;
+import org.apache.hadoop.yarn.server.router.webapp.dao.FederationBulkActivitiesInfo;
 import org.apache.hadoop.yarn.server.router.webapp.dao.FederationSchedulerTypeInfo;
 import org.apache.hadoop.yarn.util.LRUCacheHashMap;
 import org.apache.hadoop.yarn.util.MonotonicClock;
@@ -1779,4 +1784,85 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
     Assert.assertNotNull(cancelResponse);
     Assert.assertEquals(response.getStatus(), Status.OK.getStatusCode());
   }
+
+  @Test
+  public void testGetActivitiesNormal() {
+    ActivitiesInfo activitiesInfo = interceptor.getActivities(null, "1", "DIAGNOSTIC");
+    Assert.assertNotNull(activitiesInfo);
+
+    String nodeId = activitiesInfo.getNodeId();
+    Assert.assertNotNull(nodeId);
+    Assert.assertEquals("1", nodeId);
+
+    String diagnostic = activitiesInfo.getDiagnostic();
+    Assert.assertNotNull(diagnostic);
+    Assert.assertTrue(StringUtils.contains(diagnostic, "Diagnostic"));
+
+    long timestamp = activitiesInfo.getTimestamp();
+    Assert.assertEquals(1673081972L, timestamp);
+
+    List<NodeAllocationInfo> allocationInfos = activitiesInfo.getAllocations();
+    Assert.assertNotNull(allocationInfos);
+    Assert.assertEquals(1, allocationInfos.size());
+  }
+
+  @Test
+  public void testGetActivitiesError() throws Exception {
+    // nodeId is empty
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        "'nodeId' must not be empty.",
+        () -> interceptor.getActivities(null, "", "DIAGNOSTIC"));
+
+    // groupBy is empty
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        "'groupBy' must not be empty.",
+        () -> interceptor.getActivities(null, "1", ""));
+
+    // groupBy value is wrong
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        "Got invalid groupBy: TEST1, valid groupBy types: [DIAGNOSTIC]",
+        () -> interceptor.getActivities(null, "1", "TEST1"));
+  }
+
+  @Test
+  public void testGetBulkActivitiesNormal() throws InterruptedException {
+    BulkActivitiesInfo bulkActivitiesInfo =
+        interceptor.getBulkActivities(null, "DIAGNOSTIC", 5);
+    Assert.assertNotNull(bulkActivitiesInfo);
+
+    Assert.assertTrue(bulkActivitiesInfo instanceof FederationBulkActivitiesInfo);
+
+    FederationBulkActivitiesInfo federationBulkActivitiesInfo =
+        FederationBulkActivitiesInfo.class.cast(bulkActivitiesInfo);
+    Assert.assertNotNull(federationBulkActivitiesInfo);
+
+    List<BulkActivitiesInfo> activitiesInfos = federationBulkActivitiesInfo.getList();
+    Assert.assertNotNull(activitiesInfos);
+    Assert.assertEquals(4, activitiesInfos.size());
+
+    for (BulkActivitiesInfo activitiesInfo : activitiesInfos) {
+      Assert.assertNotNull(activitiesInfo);
+      List<ActivitiesInfo> activitiesInfoList = activitiesInfo.getActivities();
+      Assert.assertNotNull(activitiesInfoList);
+      Assert.assertEquals(5, activitiesInfoList.size());
+    }
+  }
+
+  @Test
+  public void testGetBulkActivitiesError() throws Exception {
+    // activitiesCount < 0
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        "'activitiesCount' must not be negative.",
+        () -> interceptor.getBulkActivities(null, "DIAGNOSTIC", -1));
+
+    // groupBy value is wrong
+    LambdaTestUtils.intercept(YarnRuntimeException.class,
+        "Got invalid groupBy: TEST1, valid groupBy types: [DIAGNOSTIC]",
+        () -> interceptor.getBulkActivities(null, "TEST1", 1));
+
+    // groupBy is empty
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        "'groupBy' must not be empty.",
+        () -> interceptor.getBulkActivities(null, "", 1));
+  }
 }