Forráskód Böngészése

YARN-11326. [Federation] Add RM FederationStateStoreService Metrics. (#4963)

slfan1989 2 éve
szülő
commit
a258f1f235

+ 121 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationClientMethod.java

@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.federation;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+
+/**
+ * Class to define client method,params and arguments.
+ */
+public class FederationClientMethod<R> {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(FederationClientMethod.class);
+
+  /**
+   * List of parameters: static and dynamic values, matchings types.
+   */
+  private final Object[] params;
+
+  /**
+   * List of method parameters types, matches parameters.
+   */
+  private final Class<?>[] types;
+
+  /**
+   * String name of the method.
+   */
+  private final String methodName;
+
+  private FederationStateStore stateStoreClient = null;
+
+  private Clock clock = null;
+
+  private Class<R> clazz;
+
+  public FederationClientMethod(String method, Class<?>[] pTypes, Object... pParams)
+      throws YarnException {
+    if (pParams.length != pTypes.length) {
+      throw new YarnException("Invalid parameters for method " + method);
+    }
+
+    this.params = pParams;
+    this.types = Arrays.copyOf(pTypes, pTypes.length);
+    this.methodName = method;
+  }
+
+  public FederationClientMethod(String method, Class pTypes, Object pParams)
+      throws YarnException {
+    this(method, new Class[]{pTypes}, new Object[]{pParams});
+  }
+
+  public FederationClientMethod(String method, Class pTypes, Object pParams, Class<R> rTypes,
+      FederationStateStore fedStateStore, Clock fedClock) throws YarnException {
+    this(method, pTypes, pParams);
+    this.stateStoreClient = fedStateStore;
+    this.clock = fedClock;
+    this.clazz = rTypes;
+  }
+
+  public Object[] getParams() {
+    return Arrays.copyOf(this.params, this.params.length);
+  }
+
+  public String getMethodName() {
+    return methodName;
+  }
+
+  /**
+   * Get the calling types for this method.
+   *
+   * @return An array of calling types.
+   */
+  public Class<?>[] getTypes() {
+    return Arrays.copyOf(this.types, this.types.length);
+  }
+
+  /**
+   * We will use the invoke method to call the method in FederationStateStoreService.
+   *
+   * @return The result returned after calling the interface.
+   * @throws YarnException yarn exception.
+   */
+  protected R invoke() throws YarnException {
+    try {
+      long startTime = clock.getTime();
+      Method method = FederationStateStore.class.getMethod(methodName, types);
+      R result = clazz.cast(method.invoke(stateStoreClient, params));
+
+      long stopTime = clock.getTime();
+      FederationStateStoreServiceMetrics.succeededStateStoreServiceCall(
+          methodName, stopTime - startTime);
+      return result;
+    } catch (Exception e) {
+      LOG.error("stateStoreClient call method {} error.", methodName, e);
+      FederationStateStoreServiceMetrics.failedStateStoreServiceCall(methodName);
+      throw new YarnException(e);
+    }
+  }
+}

+ 140 - 37
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreService.java

@@ -85,6 +85,8 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.MonotonicClock;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 import org.slf4j.Logger;
@@ -110,6 +112,8 @@ public class FederationStateStoreService extends AbstractService
   private long heartbeatInterval;
   private long heartbeatInitialDelay;
   private RMContext rmContext;
+  private final Clock clock = new MonotonicClock();
+  private FederationStateStoreServiceMetrics metrics;
   private String cleanUpThreadNamePrefix = "FederationStateStoreService-Clean-Thread";
   private int cleanUpRetryCountNum;
   private long cleanUpRetrySleepTime;
@@ -171,6 +175,9 @@ public class FederationStateStoreService extends AbstractService
 
     LOG.info("Initialized federation membership service.");
 
+    this.metrics = FederationStateStoreServiceMetrics.getMetrics();
+    LOG.info("Initialized federation statestore service metrics.");
+
     super.serviceInit(conf);
   }
 
@@ -283,154 +290,251 @@ public class FederationStateStoreService extends AbstractService
   @Override
   public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
       GetSubClusterPolicyConfigurationRequest request) throws YarnException {
-    return stateStoreClient.getPolicyConfiguration(request);
+    FederationClientMethod<GetSubClusterPolicyConfigurationResponse> clientMethod =
+        new FederationClientMethod<>("getPolicyConfiguration",
+        GetSubClusterPolicyConfigurationRequest.class, request,
+        GetSubClusterPolicyConfigurationResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
   public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
       SetSubClusterPolicyConfigurationRequest request) throws YarnException {
-    return stateStoreClient.setPolicyConfiguration(request);
+    FederationClientMethod<SetSubClusterPolicyConfigurationResponse> clientMethod =
+        new FederationClientMethod<>("setPolicyConfiguration",
+        SetSubClusterPolicyConfigurationRequest.class, request,
+        SetSubClusterPolicyConfigurationResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
   public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
       GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
-    return stateStoreClient.getPoliciesConfigurations(request);
+    FederationClientMethod<GetSubClusterPoliciesConfigurationsResponse> clientMethod =
+        new FederationClientMethod<>("getPoliciesConfigurations",
+        GetSubClusterPoliciesConfigurationsRequest.class, request,
+        GetSubClusterPoliciesConfigurationsResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
-  public SubClusterRegisterResponse registerSubCluster(
-      SubClusterRegisterRequest registerSubClusterRequest)
+  public SubClusterRegisterResponse registerSubCluster(SubClusterRegisterRequest request)
       throws YarnException {
-    return stateStoreClient.registerSubCluster(registerSubClusterRequest);
+    FederationClientMethod<SubClusterRegisterResponse> clientMethod =
+        new FederationClientMethod<>("registerSubCluster",
+        SubClusterRegisterRequest.class, request,
+        SubClusterRegisterResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
-  public SubClusterDeregisterResponse deregisterSubCluster(
-      SubClusterDeregisterRequest subClusterDeregisterRequest)
+  public SubClusterDeregisterResponse deregisterSubCluster(SubClusterDeregisterRequest request)
       throws YarnException {
-    return stateStoreClient.deregisterSubCluster(subClusterDeregisterRequest);
+    FederationClientMethod<SubClusterDeregisterResponse> clientMethod =
+        new FederationClientMethod<>("deregisterSubCluster",
+        SubClusterDeregisterRequest.class, request,
+        SubClusterDeregisterResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
-  public SubClusterHeartbeatResponse subClusterHeartbeat(
-      SubClusterHeartbeatRequest subClusterHeartbeatRequest)
+  public SubClusterHeartbeatResponse subClusterHeartbeat(SubClusterHeartbeatRequest request)
       throws YarnException {
-    return stateStoreClient.subClusterHeartbeat(subClusterHeartbeatRequest);
+    FederationClientMethod<SubClusterHeartbeatResponse> clientMethod =
+        new FederationClientMethod<>("subClusterHeartbeat",
+        SubClusterHeartbeatRequest.class, request,
+        SubClusterHeartbeatResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
-  public GetSubClusterInfoResponse getSubCluster(
-      GetSubClusterInfoRequest subClusterRequest) throws YarnException {
-    return stateStoreClient.getSubCluster(subClusterRequest);
+  public GetSubClusterInfoResponse getSubCluster(GetSubClusterInfoRequest request)
+      throws YarnException {
+    FederationClientMethod<GetSubClusterInfoResponse> clientMethod =
+        new FederationClientMethod<>("getSubCluster",
+        GetSubClusterInfoRequest.class, request,
+        GetSubClusterInfoResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
-  public GetSubClustersInfoResponse getSubClusters(
-      GetSubClustersInfoRequest subClustersRequest) throws YarnException {
-    return stateStoreClient.getSubClusters(subClustersRequest);
+  public GetSubClustersInfoResponse getSubClusters(GetSubClustersInfoRequest request)
+      throws YarnException {
+    FederationClientMethod<GetSubClustersInfoResponse> clientMethod =
+        new FederationClientMethod<>("getSubClusters",
+        GetSubClustersInfoRequest.class, request,
+        GetSubClustersInfoResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
   public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
       AddApplicationHomeSubClusterRequest request) throws YarnException {
-    return stateStoreClient.addApplicationHomeSubCluster(request);
+    FederationClientMethod<AddApplicationHomeSubClusterResponse> clientMethod =
+        new FederationClientMethod<>("addApplicationHomeSubCluster",
+        AddApplicationHomeSubClusterRequest.class, request,
+        AddApplicationHomeSubClusterResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
   public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
       UpdateApplicationHomeSubClusterRequest request) throws YarnException {
-    return stateStoreClient.updateApplicationHomeSubCluster(request);
+    FederationClientMethod<UpdateApplicationHomeSubClusterResponse> clientMethod =
+        new FederationClientMethod<>("updateApplicationHomeSubCluster",
+        AddApplicationHomeSubClusterRequest.class, request,
+        UpdateApplicationHomeSubClusterResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
   public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
       GetApplicationHomeSubClusterRequest request) throws YarnException {
-    return stateStoreClient.getApplicationHomeSubCluster(request);
+    FederationClientMethod<GetApplicationHomeSubClusterResponse> clientMethod =
+        new FederationClientMethod<>("getApplicationHomeSubCluster",
+        GetApplicationHomeSubClusterRequest.class, request,
+        GetApplicationHomeSubClusterResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
   public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
       GetApplicationsHomeSubClusterRequest request) throws YarnException {
-    return stateStoreClient.getApplicationsHomeSubCluster(request);
+    FederationClientMethod<GetApplicationsHomeSubClusterResponse> clientMethod =
+        new FederationClientMethod<>("getApplicationsHomeSubCluster",
+        GetApplicationsHomeSubClusterRequest.class, request,
+        GetApplicationsHomeSubClusterResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
   public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
       DeleteApplicationHomeSubClusterRequest request) throws YarnException {
-    return stateStoreClient.deleteApplicationHomeSubCluster(request);
+    FederationClientMethod<DeleteApplicationHomeSubClusterResponse> clientMethod =
+        new FederationClientMethod<>("deleteApplicationHomeSubCluster",
+        DeleteApplicationHomeSubClusterRequest.class, request,
+        DeleteApplicationHomeSubClusterResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
   public AddReservationHomeSubClusterResponse addReservationHomeSubCluster(
       AddReservationHomeSubClusterRequest request) throws YarnException {
-    return stateStoreClient.addReservationHomeSubCluster(request);
+    FederationClientMethod<AddReservationHomeSubClusterResponse> clientMethod =
+        new FederationClientMethod<>("addReservationHomeSubCluster",
+        AddReservationHomeSubClusterRequest.class, request,
+        AddReservationHomeSubClusterResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
   public GetReservationHomeSubClusterResponse getReservationHomeSubCluster(
       GetReservationHomeSubClusterRequest request) throws YarnException {
-    return stateStoreClient.getReservationHomeSubCluster(request);
+    FederationClientMethod<GetReservationHomeSubClusterResponse> clientMethod =
+        new FederationClientMethod<>("getReservationHomeSubCluster",
+        GetReservationHomeSubClusterRequest.class, request,
+        GetReservationHomeSubClusterResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
   public GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
       GetReservationsHomeSubClusterRequest request) throws YarnException {
-    return stateStoreClient.getReservationsHomeSubCluster(request);
+    FederationClientMethod<GetReservationsHomeSubClusterResponse> clientMethod =
+        new FederationClientMethod<>("getReservationsHomeSubCluster",
+        GetReservationsHomeSubClusterRequest.class, request,
+        GetReservationsHomeSubClusterResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
   public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster(
       UpdateReservationHomeSubClusterRequest request) throws YarnException {
-    return stateStoreClient.updateReservationHomeSubCluster(request);
+    FederationClientMethod<UpdateReservationHomeSubClusterResponse> clientMethod =
+        new FederationClientMethod<>("updateReservationHomeSubCluster",
+        GetReservationsHomeSubClusterRequest.class, request,
+        UpdateReservationHomeSubClusterResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
   public DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster(
       DeleteReservationHomeSubClusterRequest request) throws YarnException {
-    return stateStoreClient.deleteReservationHomeSubCluster(request);
+    FederationClientMethod<DeleteReservationHomeSubClusterResponse> clientMethod =
+        new FederationClientMethod<>("deleteReservationHomeSubCluster",
+        DeleteReservationHomeSubClusterRequest.class, request,
+        DeleteReservationHomeSubClusterResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
   public RouterMasterKeyResponse storeNewMasterKey(RouterMasterKeyRequest request)
       throws YarnException, IOException {
-    return stateStoreClient.storeNewMasterKey(request);
+    FederationClientMethod<RouterMasterKeyResponse> clientMethod = new FederationClientMethod<>(
+        "storeNewMasterKey",
+        RouterMasterKeyRequest.class, request,
+        RouterMasterKeyResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
   public RouterMasterKeyResponse removeStoredMasterKey(RouterMasterKeyRequest request)
       throws YarnException, IOException {
-    return stateStoreClient.removeStoredMasterKey(request);
+    FederationClientMethod<RouterMasterKeyResponse> clientMethod = new FederationClientMethod<>(
+        "removeStoredMasterKey",
+        RouterMasterKeyRequest.class, request,
+        RouterMasterKeyResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
   public RouterMasterKeyResponse getMasterKeyByDelegationKey(RouterMasterKeyRequest request)
       throws YarnException, IOException {
-    return stateStoreClient.getMasterKeyByDelegationKey(request);
+    FederationClientMethod<RouterMasterKeyResponse> clientMethod = new FederationClientMethod<>(
+        "getMasterKeyByDelegationKey",
+        RouterMasterKeyRequest.class, request,
+        RouterMasterKeyResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
   public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request)
       throws YarnException, IOException {
-    return stateStoreClient.storeNewToken(request);
+    FederationClientMethod<RouterRMTokenResponse> clientMethod = new FederationClientMethod<>(
+        "storeNewToken",
+        RouterRMTokenRequest.class, request,
+        RouterRMTokenResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
   public RouterRMTokenResponse updateStoredToken(RouterRMTokenRequest request)
       throws YarnException, IOException {
-    return stateStoreClient.updateStoredToken(request);
+    FederationClientMethod<RouterRMTokenResponse> clientMethod = new FederationClientMethod<>(
+        "updateStoredToken",
+        RouterRMTokenRequest.class, request,
+        RouterRMTokenResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
   public RouterRMTokenResponse removeStoredToken(RouterRMTokenRequest request)
       throws YarnException, IOException {
-    return stateStoreClient.removeStoredToken(request);
+    FederationClientMethod<RouterRMTokenResponse> clientMethod = new FederationClientMethod<>(
+        "removeStoredToken",
+        RouterRMTokenRequest.class, request,
+        RouterRMTokenResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
   public RouterRMTokenResponse getTokenByRouterStoreToken(RouterRMTokenRequest request)
       throws YarnException, IOException {
-    return stateStoreClient.getTokenByRouterStoreToken(request);
+    FederationClientMethod<RouterRMTokenResponse> clientMethod = new FederationClientMethod<>(
+        "getTokenByRouterStoreToken",
+        RouterRMTokenRequest.class, request,
+        RouterRMTokenResponse.class, stateStoreClient, clock);
+    return clientMethod.invoke();
   }
 
   @Override
@@ -612,5 +716,4 @@ public class FederationStateStoreService extends AbstractService
     }
     return true;
   }
-
-}
+}

+ 196 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/federation/FederationStateStoreServiceMetrics.java

@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.federation;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+
+@Metrics(about = "Metrics for FederationStateStoreService", context = "fedr")
+public final class FederationStateStoreServiceMetrics {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(FederationStateStoreServiceMetrics.class);
+
+  private static final MetricsInfo RECORD_INFO =
+      info("FederationStateStoreServiceMetrics", "Metrics for the RM FederationStateStoreService");
+
+  private static volatile FederationStateStoreServiceMetrics instance = null;
+  private MetricsRegistry registry;
+
+  private final static Method[] STATESTORE_API_METHODS = FederationStateStore.class.getMethods();
+
+  // Map method names to counter objects
+  private static final Map<String, MutableCounterLong> FAILED_CALLS = new HashMap<>();
+  private static final Map<String, MutableRate> SUCCESSFUL_CALLS = new HashMap<>();
+  // Provide quantile latency for each api call.
+  private static final Map<String, MutableQuantiles> QUANTILE_METRICS = new HashMap<>();
+
+  // Error string templates for logging calls from methods not in
+  // FederationStateStore API
+  private static final String UNKOWN_FAIL_ERROR_MSG =
+      "Not recording failed call for unknown FederationStateStore method {}";
+  private static final String UNKNOWN_SUCCESS_ERROR_MSG =
+      "Not recording successful call for unknown FederationStateStore method {}";
+
+  /**
+   * Initialize the singleton instance.
+   *
+   * @return the singleton
+   */
+  public static FederationStateStoreServiceMetrics getMetrics() {
+    synchronized (FederationStateStoreServiceMetrics.class) {
+      if (instance == null) {
+        instance = DefaultMetricsSystem.instance()
+            .register(new FederationStateStoreServiceMetrics());
+      }
+    }
+    return instance;
+  }
+
+  private FederationStateStoreServiceMetrics() {
+    registry = new MetricsRegistry(RECORD_INFO);
+    registry.tag(RECORD_INFO, "FederationStateStoreServiceMetrics");
+
+    // Create the metrics for each method and put them into the map
+    for (Method m : STATESTORE_API_METHODS) {
+      String methodName = m.getName();
+      LOG.debug("Registering Federation StateStore Service metrics for {}", methodName);
+
+      // This metric only records the number of failed calls; it does not
+      // capture latency information
+      FAILED_CALLS.put(methodName, registry.newCounter(methodName + "NumFailedCalls",
+          "# failed calls to " + methodName, 0L));
+
+      // This metric records both the number and average latency of successful
+      // calls.
+      SUCCESSFUL_CALLS.put(methodName, registry.newRate(methodName + "SuccessfulCalls",
+          "# successful calls and latency(ms) for" + methodName));
+
+      // This metric records the quantile-based latency of each successful call,
+      // re-sampled every 10 seconds.
+      QUANTILE_METRICS.put(methodName, registry.newQuantiles(methodName + "Latency",
+          "Quantile latency (ms) for " + methodName, "ops", "latency", 10));
+    }
+  }
+
+  // Aggregate metrics are shared, and don't have to be looked up per call
+  @Metric("Total number of successful calls and latency(ms)")
+  private static MutableRate totalSucceededCalls;
+
+  @Metric("Total number of failed StateStore calls")
+  private static MutableCounterLong totalFailedCalls;
+
+  public static void failedStateStoreServiceCall() {
+    String methodName = Thread.currentThread().getStackTrace()[2].getMethodName();
+    MutableCounterLong methodMetric = FAILED_CALLS.get(methodName);
+
+    if (methodMetric == null) {
+      LOG.error(UNKOWN_FAIL_ERROR_MSG, methodName);
+      return;
+    }
+
+    totalFailedCalls.incr();
+    methodMetric.incr();
+  }
+
+  public static void failedStateStoreServiceCall(String methodName) {
+    MutableCounterLong methodMetric = FAILED_CALLS.get(methodName);
+    if (methodMetric == null) {
+      LOG.error(UNKOWN_FAIL_ERROR_MSG, methodName);
+      return;
+    }
+    totalFailedCalls.incr();
+    methodMetric.incr();
+  }
+
+  public static void succeededStateStoreServiceCall(long duration) {
+    StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
+    if (ArrayUtils.isNotEmpty(stackTraceElements) && stackTraceElements.length > 2) {
+      String methodName = Thread.currentThread().getStackTrace()[2].getMethodName();
+      if(SUCCESSFUL_CALLS.containsKey(methodName)) {
+        succeededStateStoreServiceCall(methodName, duration);
+      } else {
+        LOG.error(UNKNOWN_SUCCESS_ERROR_MSG, methodName);
+      }
+    } else {
+      LOG.error("stackTraceElements is empty or length < 2.");
+    }
+  }
+
+  public static void succeededStateStoreServiceCall(String methodName, long duration) {
+    if (SUCCESSFUL_CALLS.containsKey(methodName)) {
+      MutableRate methodMetric = SUCCESSFUL_CALLS.get(methodName);
+      MutableQuantiles methodQuantileMetric = QUANTILE_METRICS.get(methodName);
+      if (methodMetric == null || methodQuantileMetric == null) {
+        LOG.error(UNKNOWN_SUCCESS_ERROR_MSG, methodName);
+        return;
+      }
+      totalSucceededCalls.add(duration);
+      methodMetric.add(duration);
+      methodQuantileMetric.add(duration);
+    }
+  }
+
+  // Getters for unit testing
+  @VisibleForTesting
+  public static long getNumFailedCallsForMethod(String methodName) {
+    return FAILED_CALLS.get(methodName).value();
+  }
+
+  @VisibleForTesting
+  public static long getNumSucceessfulCallsForMethod(String methodName) {
+    return SUCCESSFUL_CALLS.get(methodName).lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  public static double getLatencySucceessfulCallsForMethod(String methodName) {
+    return SUCCESSFUL_CALLS.get(methodName).lastStat().mean();
+  }
+
+  @VisibleForTesting
+  public static long getNumFailedCalls() {
+    return totalFailedCalls.value();
+  }
+
+  @VisibleForTesting
+  public static long getNumSucceededCalls() {
+    return totalSucceededCalls.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  public static double getLatencySucceededCalls() {
+    return totalSucceededCalls.lastStat().mean();
+  }
+}

+ 172 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationRMStateStoreService.java

@@ -38,6 +38,8 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
 import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
 import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
@@ -52,6 +54,17 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHome
 import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
 import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
@@ -89,6 +102,7 @@ public class TestFederationRMStateStoreService {
   private long lastHearbeatTS = 0;
   private JSONJAXBContext jc;
   private JSONUnmarshaller unmarshaller;
+  private MockRM mockRM;
 
   @Before
   public void setUp() throws IOException, YarnException, JAXBException {
@@ -97,12 +111,23 @@ public class TestFederationRMStateStoreService {
         JSONConfiguration.mapped().rootUnwrapping(false).build(),
         ClusterMetricsInfo.class);
     unmarshaller = jc.createJSONUnmarshaller();
+
+    conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+    conf.setInt(YarnConfiguration.FEDERATION_STATESTORE_HEARTBEAT_INITIAL_DELAY, 10);
+    conf.set(YarnConfiguration.RM_CLUSTER_ID, subClusterId.getId());
+
+    // set up MockRM
+    mockRM = new MockRM(conf);
+    mockRM.init(conf);
+    mockRM.start();
   }
 
   @After
   public void tearDown() throws Exception {
     unmarshaller = null;
     jc = null;
+    mockRM.stop();
+    mockRM = null;
   }
 
   @Test
@@ -250,10 +275,8 @@ public class TestFederationRMStateStoreService {
 
     // init subCluster Heartbeat,
     // and check that the subCluster is in a running state
-    FederationStateStoreService stateStoreService =
-        rm.getFederationStateStoreService();
-    FederationStateStoreHeartbeat storeHeartbeat =
-        stateStoreService.getStateStoreHeartbeatThread();
+    FederationStateStoreService stateStoreService = rm.getFederationStateStoreService();
+    FederationStateStoreHeartbeat storeHeartbeat = stateStoreService.getStateStoreHeartbeatThread();
     storeHeartbeat.run();
     checkSubClusterInfo(SubClusterState.SC_RUNNING);
 
@@ -482,4 +505,149 @@ public class TestFederationRMStateStoreService {
 
     rmAppMaps.putIfAbsent(application.getApplicationId(), application);
   }
+
+
+  @Test
+  public void testPolicyConfigurationMethod() throws YarnException {
+
+    // This test case tests 3 methods.
+    // 1.setPolicyConfiguration
+    // 2.getPolicyConfiguration
+    // 3.getPolicyConfigurations
+    FederationStateStoreService stateStoreService = mockRM.getFederationStateStoreService();
+
+    // set queue basic information (queue1)
+    String queue1 = "queue1";
+    SubClusterPolicyConfiguration requestPolicyConf1 = getUniformPolicy(queue1);
+    SetSubClusterPolicyConfigurationRequest configurationRequest1 =
+        SetSubClusterPolicyConfigurationRequest.newInstance(requestPolicyConf1);
+    // store policy configuration (queue1)
+    stateStoreService.setPolicyConfiguration(configurationRequest1);
+
+    // set queue basic information (queue2)
+    String queue2 = "queue2";
+    SubClusterPolicyConfiguration requestPolicyConf2 = getUniformPolicy(queue2);
+    SetSubClusterPolicyConfigurationRequest configurationRequest2 =
+        SetSubClusterPolicyConfigurationRequest.newInstance(requestPolicyConf2);
+    // store policy configuration (queue1)
+    stateStoreService.setPolicyConfiguration(configurationRequest2);
+
+    // get policy configuration
+    GetSubClusterPolicyConfigurationRequest request1 =
+        GetSubClusterPolicyConfigurationRequest.newInstance(queue1);
+    GetSubClusterPolicyConfigurationResponse response =
+        stateStoreService.getPolicyConfiguration(request1);
+    Assert.assertNotNull(response);
+
+    SubClusterPolicyConfiguration responsePolicyConf =
+        response.getPolicyConfiguration();
+    Assert.assertNotNull(responsePolicyConf);
+    Assert.assertEquals(requestPolicyConf1, responsePolicyConf);
+
+    // get policy configurations
+    GetSubClusterPoliciesConfigurationsRequest policiesRequest1 =
+        GetSubClusterPoliciesConfigurationsRequest.newInstance();
+    GetSubClusterPoliciesConfigurationsResponse policiesResponse1 =
+        stateStoreService.getPoliciesConfigurations(policiesRequest1);
+    Assert.assertNotNull(policiesResponse1);
+
+    List<SubClusterPolicyConfiguration> policiesConfigs = policiesResponse1.getPoliciesConfigs();
+    Assert.assertNotNull(policiesConfigs);
+    Assert.assertEquals(2, policiesConfigs.size());
+    Assert.assertTrue(policiesConfigs.contains(requestPolicyConf1));
+    Assert.assertTrue(policiesConfigs.contains(requestPolicyConf2));
+  }
+
+  public SubClusterPolicyConfiguration getUniformPolicy(String queue)
+      throws FederationPolicyInitializationException {
+    UniformBroadcastPolicyManager wfp = new UniformBroadcastPolicyManager();
+    wfp.setQueue(queue);
+    SubClusterPolicyConfiguration fpc = wfp.serializeConf();
+    return fpc;
+  }
+
+  @Test
+  public void testSubClusterMethod() throws YarnException {
+
+    // This test case tests 5 methods.
+    // 1.registerSubCluster
+    // 2.deregisterSubCluster
+    // 3.subClusterHeartbeat
+    // 4.getSubCluster
+    // 5.getSubClusters
+
+    FederationStateStoreService stateStoreService =
+        mockRM.getFederationStateStoreService();
+
+    // registerSubCluster subCluster1
+    SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
+    SubClusterInfo subClusterInfo1 = createSubClusterInfo(subClusterId1);
+
+    SubClusterRegisterRequest registerRequest1 =
+        SubClusterRegisterRequest.newInstance(subClusterInfo1);
+    stateStoreService.registerSubCluster(registerRequest1);
+
+    // registerSubCluster subCluster2
+    SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
+    SubClusterInfo subClusterInfo2 = createSubClusterInfo(subClusterId2);
+
+    SubClusterRegisterRequest registerRequest2 =
+        SubClusterRegisterRequest.newInstance(subClusterInfo2);
+    stateStoreService.registerSubCluster(registerRequest2);
+
+    // getSubCluster subCluster1
+    GetSubClusterInfoRequest subClusterRequest =
+        GetSubClusterInfoRequest.newInstance(subClusterId1);
+    GetSubClusterInfoResponse subClusterResponse =
+        stateStoreService.getSubCluster(subClusterRequest);
+    Assert.assertNotNull(subClusterResponse);
+
+    // We query subCluster1, we want to get SubClusterInfo of subCluster1
+    SubClusterInfo subClusterInfo1Resp = subClusterResponse.getSubClusterInfo();
+    Assert.assertNotNull(subClusterInfo1Resp);
+    Assert.assertEquals(subClusterInfo1, subClusterInfo1Resp);
+
+    // We call the getSubClusters method and filter the Active SubCluster
+    // subCluster1 and subCluster2 are just registered, they are in NEW state,
+    // so we will get 0 active subclusters
+    GetSubClustersInfoRequest subClustersInfoRequest =
+        GetSubClustersInfoRequest.newInstance(true);
+    GetSubClustersInfoResponse subClustersInfoResp =
+        stateStoreService.getSubClusters(subClustersInfoRequest);
+    Assert.assertNotNull(subClustersInfoResp);
+    List<SubClusterInfo> subClusterInfos = subClustersInfoResp.getSubClusters();
+    Assert.assertNotNull(subClusterInfos);
+    Assert.assertEquals(0, subClusterInfos.size());
+
+    // We let subCluster1 heartbeat and set subCluster1 to Running state
+    SubClusterHeartbeatRequest heartbeatRequest =
+        SubClusterHeartbeatRequest.newInstance(subClusterId1, SubClusterState.SC_RUNNING,
+        "capability");
+    SubClusterHeartbeatResponse heartbeatResponse =
+        stateStoreService.subClusterHeartbeat(heartbeatRequest);
+    Assert.assertNotNull(heartbeatResponse);
+
+    // We call the getSubClusters method again and filter the Active SubCluster
+    // We want to get 1 active SubCluster
+    GetSubClustersInfoRequest subClustersInfoRequest1 =
+        GetSubClustersInfoRequest.newInstance(true);
+    GetSubClustersInfoResponse subClustersInfoResp1 =
+        stateStoreService.getSubClusters(subClustersInfoRequest1);
+    Assert.assertNotNull(subClustersInfoResp1);
+    List<SubClusterInfo> subClusterInfos1 = subClustersInfoResp1.getSubClusters();
+    Assert.assertNotNull(subClusterInfos1);
+    Assert.assertEquals(1, subClusterInfos1.size());
+  }
+
+  private SubClusterInfo createSubClusterInfo(SubClusterId clusterId) {
+
+    String amRMAddress = "1.2.3.4:1";
+    String clientRMAddress = "1.2.3.4:2";
+    String rmAdminAddress = "1.2.3.4:3";
+    String webAppAddress = "1.2.3.4:4";
+
+    return SubClusterInfo.newInstance(clusterId, amRMAddress,
+        clientRMAddress, rmAdminAddress, webAppAddress, SubClusterState.SC_NEW,
+        Time.now(), "capability");
+  }
 }

+ 102 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/federation/TestFederationStateStoreServiceMetrics.java

@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.federation;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for TestFederationStateStoreServiceMetrics.
+ */
+public class TestFederationStateStoreServiceMetrics {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestFederationStateStoreServiceMetrics.class);
+
+  private static FederationStateStoreServiceMetrics metrics =
+      FederationStateStoreServiceMetrics.getMetrics();
+
+  private MockBadFederationStateStoreService badStateStore =
+      new MockBadFederationStateStoreService();
+  private MockGoodFederationStateStoreService goodStateStore =
+      new MockGoodFederationStateStoreService();
+
+  // Records failures for all calls
+  private class MockBadFederationStateStoreService {
+    public void registerSubCluster() {
+      LOG.info("Mocked: failed registerSubCluster call");
+      FederationStateStoreServiceMetrics.failedStateStoreServiceCall();
+    }
+  }
+
+  // Records successes for all calls
+  private class MockGoodFederationStateStoreService {
+    public void registerSubCluster(long duration) {
+      LOG.info("Mocked: successful registerSubCluster call with duration {}", duration);
+      FederationStateStoreServiceMetrics.succeededStateStoreServiceCall(duration);
+    }
+  }
+
+  @Test
+  public void testFederationStateStoreServiceMetricInit() {
+    LOG.info("Test: aggregate metrics are initialized correctly");
+    assertEquals(0, FederationStateStoreServiceMetrics.getNumSucceededCalls());
+    assertEquals(0, FederationStateStoreServiceMetrics.getNumFailedCalls());
+    LOG.info("Test: aggregate metrics are updated correctly");
+  }
+
+  @Test
+  public void testRegisterSubClusterSuccessfulCalls() {
+    LOG.info("Test: Aggregate and method successful calls updated correctly.");
+
+    long totalGoodBefore = FederationStateStoreServiceMetrics.getNumSucceededCalls();
+    long apiGoodBefore = FederationStateStoreServiceMetrics.
+        getNumSucceessfulCallsForMethod("registerSubCluster");
+
+    // Call the registerSubCluster method
+    goodStateStore.registerSubCluster(100);
+
+    assertEquals(totalGoodBefore + 1,
+        FederationStateStoreServiceMetrics.getNumSucceededCalls());
+    assertEquals(100, FederationStateStoreServiceMetrics.getLatencySucceededCalls(), 0);
+    assertEquals(apiGoodBefore + 1,
+        FederationStateStoreServiceMetrics.getNumSucceededCalls());
+    double latencySucceessfulCalls =
+        FederationStateStoreServiceMetrics.getLatencySucceessfulCallsForMethod(
+        "registerSubCluster");
+    assertEquals(100, latencySucceessfulCalls, 0);
+
+    LOG.info("Test: Running stats correctly calculated for 2 metrics");
+
+    // Call the registerSubCluster method
+    goodStateStore.registerSubCluster(200);
+
+    assertEquals(totalGoodBefore + 2,
+        FederationStateStoreServiceMetrics.getNumSucceededCalls());
+    assertEquals(150, FederationStateStoreServiceMetrics.getLatencySucceededCalls(), 0);
+    assertEquals(apiGoodBefore + 2,
+        FederationStateStoreServiceMetrics.getNumSucceededCalls());
+    double latencySucceessfulCalls2 =
+        FederationStateStoreServiceMetrics.getLatencySucceessfulCallsForMethod(
+        "registerSubCluster");
+    assertEquals(150, latencySucceessfulCalls2, 0);
+  }
+}