浏览代码

YARN-5467. InputValidator for the FederationStateStore internal APIs. (Giovanni Matteo Fumarola via Subru)

(cherry picked from commit bd44182e70c273ad3371a6f9b458fe0f8a7a6abc)
(cherry picked from commit cfafd173bd854d59effb9256cd4e7b79f6beab5b)
Subru Krishnan 8 年之前
父节点
当前提交
32a8618f39
共有 8 个文件被更改,包括 2007 次插入3 次删除
  1. 30 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
  2. 183 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java
  3. 317 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
  4. 144 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java
  5. 48 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreInvalidInputException.java
  6. 17 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/package-info.java
  7. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
  8. 1265 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java

+ 30 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java

@@ -57,6 +57,9 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegister
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
 import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.util.MonotonicClock;
 
@@ -88,6 +91,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
   @Override
   public SubClusterRegisterResponse registerSubCluster(
       SubClusterRegisterRequest request) throws YarnException {
+    FederationMembershipStateStoreInputValidator
+        .validateSubClusterRegisterRequest(request);
     SubClusterInfo subClusterInfo = request.getSubClusterInfo();
     membership.put(subClusterInfo.getSubClusterId(), subClusterInfo);
     return SubClusterRegisterResponse.newInstance();
@@ -96,6 +101,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
   @Override
   public SubClusterDeregisterResponse deregisterSubCluster(
       SubClusterDeregisterRequest request) throws YarnException {
+    FederationMembershipStateStoreInputValidator
+        .validateSubClusterDeregisterRequest(request);
     SubClusterInfo subClusterInfo = membership.get(request.getSubClusterId());
     if (subClusterInfo == null) {
       throw new YarnException(
@@ -111,6 +118,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
   public SubClusterHeartbeatResponse subClusterHeartbeat(
       SubClusterHeartbeatRequest request) throws YarnException {
 
+    FederationMembershipStateStoreInputValidator
+        .validateSubClusterHeartbeatRequest(request);
     SubClusterId subClusterId = request.getSubClusterId();
     SubClusterInfo subClusterInfo = membership.get(subClusterId);
 
@@ -129,6 +138,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
   @Override
   public GetSubClusterInfoResponse getSubCluster(
       GetSubClusterInfoRequest request) throws YarnException {
+
+    FederationMembershipStateStoreInputValidator
+        .validateGetSubClusterInfoRequest(request);
     SubClusterId subClusterId = request.getSubClusterId();
     if (!membership.containsKey(subClusterId)) {
       throw new YarnException(
@@ -157,6 +169,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
   @Override
   public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
       AddApplicationHomeSubClusterRequest request) throws YarnException {
+
+    FederationApplicationHomeSubClusterStoreInputValidator
+        .validateAddApplicationHomeSubClusterRequest(request);
     ApplicationId appId =
         request.getApplicationHomeSubCluster().getApplicationId();
 
@@ -172,6 +187,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
   @Override
   public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
       UpdateApplicationHomeSubClusterRequest request) throws YarnException {
+
+    FederationApplicationHomeSubClusterStoreInputValidator
+        .validateUpdateApplicationHomeSubClusterRequest(request);
     ApplicationId appId =
         request.getApplicationHomeSubCluster().getApplicationId();
     if (!applications.containsKey(appId)) {
@@ -186,6 +204,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
   @Override
   public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
       GetApplicationHomeSubClusterRequest request) throws YarnException {
+
+    FederationApplicationHomeSubClusterStoreInputValidator
+        .validateGetApplicationHomeSubClusterRequest(request);
     ApplicationId appId = request.getApplicationId();
     if (!applications.containsKey(appId)) {
       throw new YarnException("Application " + appId + " does not exist");
@@ -212,6 +233,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
   @Override
   public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
       DeleteApplicationHomeSubClusterRequest request) throws YarnException {
+
+    FederationApplicationHomeSubClusterStoreInputValidator
+        .validateDeleteApplicationHomeSubClusterRequest(request);
     ApplicationId appId = request.getApplicationId();
     if (!applications.containsKey(appId)) {
       throw new YarnException("Application " + appId + " does not exist");
@@ -224,6 +248,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
   @Override
   public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
       GetSubClusterPolicyConfigurationRequest request) throws YarnException {
+
+    FederationPolicyStoreInputValidator
+        .validateGetSubClusterPolicyConfigurationRequest(request);
     String queue = request.getQueue();
     if (!policies.containsKey(queue)) {
       throw new YarnException("Policy for queue " + queue + " does not exist");
@@ -236,6 +263,9 @@ public class MemoryFederationStateStore implements FederationStateStore {
   @Override
   public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
       SetSubClusterPolicyConfigurationRequest request) throws YarnException {
+
+    FederationPolicyStoreInputValidator
+        .validateSetSubClusterPolicyConfigurationRequest(request);
     policies.put(request.getPolicyConfiguration().getQueue(),
         request.getPolicyConfiguration());
     return SetSubClusterPolicyConfigurationResponse.newInstance();

+ 183 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java

@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.utils;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to validate the inputs to
+ * {@code FederationApplicationHomeSubClusterStore}, allows a fail fast
+ * mechanism for invalid user inputs.
+ *
+ */
+public final class FederationApplicationHomeSubClusterStoreInputValidator {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(FederationApplicationHomeSubClusterStoreInputValidator.class);
+
+  private FederationApplicationHomeSubClusterStoreInputValidator() {
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast). Check if the provided {@link AddApplicationHomeSubClusterRequest}
+   * for adding a new application is valid or not.
+   *
+   * @param request the {@link AddApplicationHomeSubClusterRequest} to validate
+   *          against
+   * @throws FederationStateStoreInvalidInputException if the request is invalid
+   */
+  public static void validateAddApplicationHomeSubClusterRequest(
+      AddApplicationHomeSubClusterRequest request)
+      throws FederationStateStoreInvalidInputException {
+    if (request == null) {
+      String message = "Missing AddApplicationHomeSubCluster Request."
+          + " Please try again by specifying"
+          + " an AddApplicationHomeSubCluster information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+
+    // validate ApplicationHomeSubCluster info
+    checkApplicationHomeSubCluster(request.getApplicationHomeSubCluster());
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast). Check if the provided {@link UpdateApplicationHomeSubClusterRequest}
+   * for updating an application is valid or not.
+   *
+   * @param request the {@link UpdateApplicationHomeSubClusterRequest} to
+   *          validate against
+   * @throws FederationStateStoreInvalidInputException if the request is invalid
+   */
+  public static void validateUpdateApplicationHomeSubClusterRequest(
+      UpdateApplicationHomeSubClusterRequest request)
+      throws FederationStateStoreInvalidInputException {
+    if (request == null) {
+      String message = "Missing UpdateApplicationHomeSubCluster Request."
+          + " Please try again by specifying"
+          + " an ApplicationHomeSubCluster information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+
+    // validate ApplicationHomeSubCluster info
+    checkApplicationHomeSubCluster(request.getApplicationHomeSubCluster());
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast). Check if the provided {@link GetApplicationHomeSubClusterRequest}
+   * for querying application's information is valid or not.
+   *
+   * @param request the {@link GetApplicationHomeSubClusterRequest} to validate
+   *          against
+   * @throws FederationStateStoreInvalidInputException if the request is invalid
+   */
+  public static void validateGetApplicationHomeSubClusterRequest(
+      GetApplicationHomeSubClusterRequest request)
+      throws FederationStateStoreInvalidInputException {
+    if (request == null) {
+      String message = "Missing GetApplicationHomeSubCluster Request."
+          + " Please try again by specifying an Application Id information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+
+    // validate application Id
+    checkApplicationId(request.getApplicationId());
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast). Check if the provided {@link DeleteApplicationHomeSubClusterRequest}
+   * for deleting an application is valid or not.
+   *
+   * @param request the {@link DeleteApplicationHomeSubClusterRequest} to
+   *          validate against
+   * @throws FederationStateStoreInvalidInputException if the request is invalid
+   */
+  public static void validateDeleteApplicationHomeSubClusterRequest(
+      DeleteApplicationHomeSubClusterRequest request)
+      throws FederationStateStoreInvalidInputException {
+    if (request == null) {
+      String message = "Missing DeleteApplicationHomeSubCluster Request."
+          + " Please try again by specifying"
+          + " an ApplicationHomeSubCluster information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+
+    // validate application Id
+    checkApplicationId(request.getApplicationId());
+  }
+
+  /**
+   * Validate if the ApplicationHomeSubCluster info are present or not.
+   *
+   * @param applicationHomeSubCluster the information of the application to be
+   *          verified
+   * @throws FederationStateStoreInvalidInputException if the SubCluster Info
+   *           are invalid
+   */
+  private static void checkApplicationHomeSubCluster(
+      ApplicationHomeSubCluster applicationHomeSubCluster)
+
+      throws FederationStateStoreInvalidInputException {
+    if (applicationHomeSubCluster == null) {
+      String message = "Missing ApplicationHomeSubCluster Info."
+          + " Please try again by specifying"
+          + " an ApplicationHomeSubCluster information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+    // validate application Id
+    checkApplicationId(applicationHomeSubCluster.getApplicationId());
+
+    // validate subcluster Id
+    FederationMembershipStateStoreInputValidator
+        .checkSubClusterId(applicationHomeSubCluster.getHomeSubCluster());
+
+  }
+
+  /**
+   * Validate if the application id is present or not.
+   *
+   * @param appId the id of the application to be verified
+   * @throws FederationStateStoreInvalidInputException if the application Id is
+   *           invalid
+   */
+  private static void checkApplicationId(ApplicationId appId)
+      throws FederationStateStoreInvalidInputException {
+    if (appId == null) {
+      String message = "Missing Application Id."
+          + " Please try again by specifying an Application Id.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+  }
+}

+ 317 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java

@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.utils;
+
+import java.net.URI;
+
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to validate the inputs to
+ * {@code FederationMembershipStateStore}, allows a fail fast mechanism for
+ * invalid user inputs.
+ *
+ */
+public final class FederationMembershipStateStoreInputValidator {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(FederationMembershipStateStoreInputValidator.class);
+
+  private FederationMembershipStateStoreInputValidator() {
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast). Check if the provided {@link SubClusterRegisterRequest} for
+   * registration a new subcluster is valid or not.
+   *
+   * @param request the {@link SubClusterRegisterRequest} to validate against
+   * @throws FederationStateStoreInvalidInputException if the request is invalid
+   */
+  public static void validateSubClusterRegisterRequest(
+      SubClusterRegisterRequest request)
+      throws FederationStateStoreInvalidInputException {
+
+    // check if the request is present
+    if (request == null) {
+      String message = "Missing SubClusterRegister Request."
+          + " Please try again by specifying a"
+          + " SubCluster Register Information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+
+    }
+
+    // validate subcluster info
+    checkSubClusterInfo(request.getSubClusterInfo());
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast). Check if the provided {@link SubClusterDeregisterRequest} for
+   * deregistration a subcluster is valid or not.
+   *
+   * @param request the {@link SubClusterDeregisterRequest} to validate against
+   * @throws FederationStateStoreInvalidInputException if the request is invalid
+   */
+  public static void validateSubClusterDeregisterRequest(
+      SubClusterDeregisterRequest request)
+      throws FederationStateStoreInvalidInputException {
+
+    // check if the request is present
+    if (request == null) {
+      String message = "Missing SubClusterDeregister Request."
+          + " Please try again by specifying a"
+          + " SubCluster Deregister Information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+
+    // validate subcluster id
+    checkSubClusterId(request.getSubClusterId());
+    // validate subcluster state
+    checkSubClusterState(request.getState());
+    if (!request.getState().isFinal()) {
+      String message = "Invalid non-final state: " + request.getState();
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast). Check if the provided {@link SubClusterHeartbeatRequest} for
+   * heartbeating a subcluster is valid or not.
+   *
+   * @param request the {@link SubClusterHeartbeatRequest} to validate against
+   * @throws FederationStateStoreInvalidInputException if the request is invalid
+   */
+  public static void validateSubClusterHeartbeatRequest(
+      SubClusterHeartbeatRequest request)
+      throws FederationStateStoreInvalidInputException {
+
+    // check if the request is present
+    if (request == null) {
+      String message = "Missing SubClusterHeartbeat Request."
+          + " Please try again by specifying a"
+          + " SubCluster Heartbeat Information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+
+    // validate subcluster id
+    checkSubClusterId(request.getSubClusterId());
+    // validate last heartbeat timestamp
+    checkTimestamp(request.getLastHeartBeat());
+    // validate subcluster capability
+    checkCapability(request.getCapability());
+    // validate subcluster state
+    checkSubClusterState(request.getState());
+
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast). Check if the provided {@link GetSubClusterInfoRequest} for querying
+   * subcluster's information is valid or not.
+   *
+   * @param request the {@link GetSubClusterInfoRequest} to validate against
+   * @throws FederationStateStoreInvalidInputException if the request is invalid
+   */
+  public static void validateGetSubClusterInfoRequest(
+      GetSubClusterInfoRequest request)
+      throws FederationStateStoreInvalidInputException {
+
+    // check if the request is present
+    if (request == null) {
+      String message = "Missing GetSubClusterInfo Request."
+          + " Please try again by specifying a Get SubCluster information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+
+    // validate subcluster id
+    checkSubClusterId(request.getSubClusterId());
+  }
+
+  /**
+   * Validate if the SubCluster Info are present or not.
+   *
+   * @param subClusterInfo the information of the subcluster to be verified
+   * @throws FederationStateStoreInvalidInputException if the SubCluster Info
+   *           are invalid
+   */
+  private static void checkSubClusterInfo(SubClusterInfo subClusterInfo)
+      throws FederationStateStoreInvalidInputException {
+    if (subClusterInfo == null) {
+      String message = "Missing SubCluster Information."
+          + " Please try again by specifying SubCluster Information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+
+    // validate subcluster id
+    checkSubClusterId(subClusterInfo.getSubClusterId());
+
+    // validate AMRM Service address
+    checkAddress(subClusterInfo.getAMRMServiceAddress());
+    // validate ClientRM Service address
+    checkAddress(subClusterInfo.getClientRMServiceAddress());
+    // validate RMClient Service address
+    checkAddress(subClusterInfo.getRMAdminServiceAddress());
+    // validate RMWeb Service address
+    checkAddress(subClusterInfo.getRMWebServiceAddress());
+
+    // validate last heartbeat timestamp
+    checkTimestamp(subClusterInfo.getLastHeartBeat());
+    // validate last start timestamp
+    checkTimestamp(subClusterInfo.getLastStartTime());
+
+    // validate subcluster state
+    checkSubClusterState(subClusterInfo.getState());
+
+    // validate subcluster capability
+    checkCapability(subClusterInfo.getCapability());
+  }
+
+  /**
+   * Validate if the timestamp is positive or not.
+   *
+   * @param timestamp the timestamp to be verified
+   * @throws FederationStateStoreInvalidInputException if the timestamp is
+   *           invalid
+   */
+  private static void checkTimestamp(long timestamp)
+      throws FederationStateStoreInvalidInputException {
+    if (timestamp < 0) {
+      String message = "Invalid timestamp information."
+          + " Please try again by specifying valid Timestamp Information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+  }
+
+  /**
+   * Validate if the Capability is present or not.
+   *
+   * @param capability the capability of the subcluster to be verified
+   * @throws FederationStateStoreInvalidInputException if the capability is
+   *           invalid
+   */
+  private static void checkCapability(String capability)
+      throws FederationStateStoreInvalidInputException {
+    if (capability == null || capability.isEmpty()) {
+      String message = "Invalid capability information."
+          + " Please try again by specifying valid Capability Information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+  }
+
+  /**
+   * Validate if the SubCluster Id is present or not.
+   *
+   * @param subClusterId the identifier of the subcluster to be verified
+   * @throws FederationStateStoreInvalidInputException if the SubCluster Id is
+   *           invalid
+   */
+  protected static void checkSubClusterId(SubClusterId subClusterId)
+      throws FederationStateStoreInvalidInputException {
+    // check if cluster id is present
+    if (subClusterId == null) {
+      String message = "Missing SubCluster Id information."
+          + " Please try again by specifying Subcluster Id information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+    // check if cluster id is valid
+    if (subClusterId.getId().isEmpty()) {
+      String message = "Invalid SubCluster Id information."
+          + " Please try again by specifying valid Subcluster Id.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+  }
+
+  /**
+   * Validate if the SubCluster Address is a valid URL or not.
+   *
+   * @param address the endpoint of the subcluster to be verified
+   * @throws FederationStateStoreInvalidInputException if the address is invalid
+   */
+  private static void checkAddress(String address)
+      throws FederationStateStoreInvalidInputException {
+    // Ensure url is not null
+    if (address == null || address.isEmpty()) {
+      String message = "Missing SubCluster Endpoint information."
+          + " Please try again by specifying SubCluster Endpoint information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+    // Validate url is well formed
+    boolean hasScheme = address.contains("://");
+    URI uri = null;
+    try {
+      uri = hasScheme ? URI.create(address)
+          : URI.create("dummyscheme://" + address);
+    } catch (IllegalArgumentException e) {
+      String message = "The provided SubCluster Endpoint does not contain a"
+          + " valid host:port authority: " + address;
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+    String host = uri.getHost();
+    int port = uri.getPort();
+    String path = uri.getPath();
+    if ((host == null) || (port < 0)
+        || (!hasScheme && path != null && !path.isEmpty())) {
+      String message = "The provided SubCluster Endpoint does not contain a"
+          + " valid host:port authority: " + address;
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+  }
+
+  /**
+   * Validate if the SubCluster State is present or not.
+   *
+   * @param state the state of the subcluster to be verified
+   * @throws FederationStateStoreInvalidInputException if the SubCluster State
+   *           is invalid
+   */
+  private static void checkSubClusterState(SubClusterState state)
+      throws FederationStateStoreInvalidInputException {
+    // check sub-cluster state is not empty
+    if (state == null) {
+      String message = "Missing SubCluster State information."
+          + " Please try again by specifying SubCluster State information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+  }
+
+}

+ 144 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java

@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.utils;
+
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to validate the inputs to {@code FederationPolicyStore}, allows
+ * a fail fast mechanism for invalid user inputs.
+ *
+ */
+public final class FederationPolicyStoreInputValidator {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FederationPolicyStoreInputValidator.class);
+
+  private FederationPolicyStoreInputValidator() {
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast). Check if the provided
+   * {@link GetSubClusterPolicyConfigurationRequest} for querying policy's
+   * information is valid or not.
+   *
+   * @param request the {@link GetSubClusterPolicyConfigurationRequest} to
+   *          validate against
+   * @throws FederationStateStoreInvalidInputException if the request is invalid
+   */
+  public static void validateGetSubClusterPolicyConfigurationRequest(
+      GetSubClusterPolicyConfigurationRequest request)
+      throws FederationStateStoreInvalidInputException {
+    if (request == null) {
+      String message = "Missing GetSubClusterPolicyConfiguration Request."
+          + " Please try again by specifying a policy selection information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+
+    // validate queue id
+    checkQueue(request.getQueue());
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast). Check if the provided
+   * {@link SetSubClusterPolicyConfigurationRequest} for adding a new policy is
+   * valid or not.
+   *
+   * @param request the {@link SetSubClusterPolicyConfigurationRequest} to
+   *          validate against
+   * @throws FederationStateStoreInvalidInputException if the request is invalid
+   */
+  public static void validateSetSubClusterPolicyConfigurationRequest(
+      SetSubClusterPolicyConfigurationRequest request)
+      throws FederationStateStoreInvalidInputException {
+    if (request == null) {
+      String message = "Missing SetSubClusterPolicyConfiguration Request."
+          + " Please try again by specifying an policy insertion information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+
+    // validate subcluster policy configuration
+    checkSubClusterPolicyConfiguration(request.getPolicyConfiguration());
+  }
+
+  /**
+   * Validate if the SubClusterPolicyConfiguration is valid or not.
+   *
+   * @param policyConfiguration the policy information to be verified
+   * @throws FederationStateStoreInvalidInputException if the policy information
+   *           are invalid
+   */
+  private static void checkSubClusterPolicyConfiguration(
+      SubClusterPolicyConfiguration policyConfiguration)
+      throws FederationStateStoreInvalidInputException {
+    if (policyConfiguration == null) {
+      String message = "Missing SubClusterPolicyConfiguration."
+          + " Please try again by specifying a SubClusterPolicyConfiguration.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+
+    // validate queue id
+    checkQueue(policyConfiguration.getQueue());
+    // validate policy type
+    checkType(policyConfiguration.getType());
+
+  }
+
+  /**
+   * Validate if the queue id is a valid or not.
+   *
+   * @param queue the queue id of the policy to be verified
+   * @throws FederationStateStoreInvalidInputException if the queue id is
+   *           invalid
+   */
+  private static void checkQueue(String queue)
+      throws FederationStateStoreInvalidInputException {
+    if (queue == null || queue.isEmpty()) {
+      String message = "Missing Queue. Please try again by specifying a Queue.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+  }
+
+  /**
+   * Validate if the policy type is a valid or not.
+   *
+   * @param type the type of the policy to be verified
+   * @throws FederationStateStoreInvalidInputException if the policy is invalid
+   */
+  private static void checkType(String type)
+      throws FederationStateStoreInvalidInputException {
+    if (type == null || type.isEmpty()) {
+      String message = "Missing Policy Type."
+          + " Please try again by specifying a Policy Type.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+  }
+
+}

+ 48 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreInvalidInputException.java

@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.utils;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Exception thrown by the {@link FederationMembershipStateStoreInputValidator},
+ * {@link FederationApplicationHomeSubClusterStoreInputValidator},
+ * {@link FederationPolicyStoreInputValidator} if the input is invalid.
+ *
+ */
+public class FederationStateStoreInvalidInputException extends YarnException {
+
+  /**
+   * IDE auto-generated.
+   */
+  private static final long serialVersionUID = -7352144682711430801L;
+
+  public FederationStateStoreInvalidInputException(Throwable cause) {
+    super(cause);
+  }
+
+  public FederationStateStoreInvalidInputException(String message) {
+    super(message);
+  }
+
+  public FederationStateStoreInvalidInputException(String message,
+      Throwable cause) {
+    super(message, cause);
+  }
+}

+ 17 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/package-info.java

@@ -0,0 +1,17 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.utils;

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java

@@ -162,9 +162,9 @@ public abstract class FederationStateStoreBaseTest {
         SubClusterRegisterRequest.newInstance(subClusterInfo2));
 
     stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest
-        .newInstance(subClusterId1, SubClusterState.SC_RUNNING, ""));
-    stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest
-        .newInstance(subClusterId2, SubClusterState.SC_UNHEALTHY, ""));
+        .newInstance(subClusterId1, SubClusterState.SC_RUNNING, "capability"));
+    stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest.newInstance(
+        subClusterId2, SubClusterState.SC_UNHEALTHY, "capability"));
 
     Assert.assertTrue(
         stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(true))

+ 1265 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java

@@ -0,0 +1,1265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.utils;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for FederationApplicationInputValidator,
+ * FederationMembershipInputValidator, and FederationPolicyInputValidator.
+ */
+public class TestFederationStateStoreInputValidator {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestFederationStateStoreInputValidator.class);
+
+  private static SubClusterId subClusterId;
+  private static String amRMServiceAddress;
+  private static String clientRMServiceAddress;
+  private static String rmAdminServiceAddress;
+  private static String rmWebServiceAddress;
+  private static int lastHeartBeat;
+  private static SubClusterState stateNew;
+  private static SubClusterState stateLost;
+  private static ApplicationId appId;
+  private static int lastStartTime;
+  private static String capability;
+  private static String queue;
+  private static String type;
+  private static ByteBuffer params;
+
+  private static SubClusterId subClusterIdInvalid;
+  private static SubClusterId subClusterIdNull;
+
+  private static int lastHeartBeatNegative;
+  private static int lastStartTimeNegative;
+
+  private static SubClusterState stateNull;
+  private static ApplicationId appIdNull;
+
+  private static String capabilityNull;
+  private static String capabilityEmpty;
+
+  private static String addressNull;
+  private static String addressEmpty;
+  private static String addressWrong;
+  private static String addressWrongPort;
+
+  private static String queueEmpty;
+  private static String queueNull;
+
+  private static String typeEmpty;
+  private static String typeNull;
+
+  @BeforeClass
+  public static void setUp() {
+    subClusterId = SubClusterId.newInstance("abc");
+    amRMServiceAddress = "localhost:8032";
+    clientRMServiceAddress = "localhost:8034";
+    rmAdminServiceAddress = "localhost:8031";
+    rmWebServiceAddress = "localhost:8088";
+    lastHeartBeat = 1000;
+    stateNew = SubClusterState.SC_NEW;
+    stateLost = SubClusterState.SC_LOST;
+    lastStartTime = 1000;
+    capability = "Memory VCores";
+    appId = ApplicationId.newInstance(lastStartTime, 1);
+    queue = "default";
+    type = "random";
+    params = ByteBuffer.allocate(10);
+    params.put((byte) 0xFF);
+
+    subClusterIdInvalid = SubClusterId.newInstance("");
+    subClusterIdNull = null;
+
+    lastHeartBeatNegative = -10;
+    lastStartTimeNegative = -10;
+
+    stateNull = null;
+    appIdNull = null;
+
+    capabilityNull = null;
+    capabilityEmpty = "";
+
+    addressNull = null;
+    addressEmpty = "";
+    addressWrong = "AddressWrong";
+    addressWrongPort = "Address:WrongPort";
+
+    queueEmpty = "";
+    queueNull = null;
+
+    typeEmpty = "";
+    typeNull = null;
+  }
+
+  @Test
+  public void testValidateSubClusterRegisterRequest() {
+
+    // Execution with valid inputs
+
+    SubClusterInfo subClusterInfo =
+        SubClusterInfo.newInstance(subClusterId, amRMServiceAddress,
+            clientRMServiceAddress, rmAdminServiceAddress, rmWebServiceAddress,
+            lastHeartBeat, stateNew, lastStartTime, capability);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.fail(e.getMessage());
+    }
+
+    // Execution with null request
+
+    try {
+      SubClusterRegisterRequest request = null;
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Missing SubClusterRegister Request."));
+    }
+
+    // Execution with null SubClusterInfo
+
+    subClusterInfo = null;
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Missing SubCluster Information."));
+    }
+
+    // Execution with Null SubClusterId
+
+    subClusterInfo =
+        SubClusterInfo.newInstance(subClusterIdNull, amRMServiceAddress,
+            clientRMServiceAddress, rmAdminServiceAddress, rmWebServiceAddress,
+            lastHeartBeat, stateNew, lastStartTime, capability);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Missing SubCluster Id information."));
+    }
+
+    // Execution with Invalid SubClusterId
+
+    subClusterInfo =
+        SubClusterInfo.newInstance(subClusterIdInvalid, amRMServiceAddress,
+            clientRMServiceAddress, rmAdminServiceAddress, rmWebServiceAddress,
+            lastHeartBeat, stateNew, lastStartTime, capability);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Invalid SubCluster Id information."));
+    }
+
+    // Execution with Null State
+
+    subClusterInfo =
+        SubClusterInfo.newInstance(subClusterId, amRMServiceAddress,
+            clientRMServiceAddress, rmAdminServiceAddress, rmWebServiceAddress,
+            lastHeartBeat, stateNull, lastStartTime, capability);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Missing SubCluster State information."));
+    }
+
+    // Execution with Null Capability
+
+    subClusterInfo =
+        SubClusterInfo.newInstance(subClusterId, amRMServiceAddress,
+            clientRMServiceAddress, rmAdminServiceAddress, rmWebServiceAddress,
+            lastHeartBeat, stateNew, lastStartTime, capabilityNull);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Invalid capability information."));
+    }
+
+    // Execution with Empty Capability
+
+    subClusterInfo =
+        SubClusterInfo.newInstance(subClusterId, amRMServiceAddress,
+            clientRMServiceAddress, rmAdminServiceAddress, rmWebServiceAddress,
+            lastHeartBeat, stateNew, lastStartTime, capabilityEmpty);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Invalid capability information."));
+    }
+  }
+
+  @Test
+  public void testValidateSubClusterRegisterRequestTimestamp() {
+
+    // Execution with Negative Last Heartbeat
+
+    SubClusterInfo subClusterInfo =
+        SubClusterInfo.newInstance(subClusterId, amRMServiceAddress,
+            clientRMServiceAddress, rmAdminServiceAddress, rmWebServiceAddress,
+            lastHeartBeatNegative, stateNew, lastStartTime, capability);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Invalid timestamp information."));
+    }
+
+    // Execution with Negative Last StartTime
+
+    subClusterInfo =
+        SubClusterInfo.newInstance(subClusterId, amRMServiceAddress,
+            clientRMServiceAddress, rmAdminServiceAddress, rmWebServiceAddress,
+            lastHeartBeat, stateNew, lastStartTimeNegative, capability);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Invalid timestamp information."));
+    }
+  }
+
+  @Test
+  public void testValidateSubClusterRegisterRequestAddress() {
+    // Execution with Null Address for amRMServiceAddress
+
+    SubClusterInfo subClusterInfo =
+        SubClusterInfo.newInstance(subClusterId, addressNull,
+            clientRMServiceAddress, rmAdminServiceAddress, rmWebServiceAddress,
+            lastHeartBeat, stateNew, lastStartTime, capability);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(e.getMessage()
+          .startsWith("Missing SubCluster Endpoint information."));
+    }
+
+    // Execution with Empty Address for amRMServiceAddress
+
+    subClusterInfo = SubClusterInfo.newInstance(subClusterId, addressEmpty,
+        clientRMServiceAddress, rmAdminServiceAddress, rmWebServiceAddress,
+        lastHeartBeat, stateNew, lastStartTime, capability);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(e.getMessage()
+          .startsWith("Missing SubCluster Endpoint information."));
+    }
+
+    // Execution with Null Address for clientRMServiceAddress
+
+    subClusterInfo =
+        SubClusterInfo.newInstance(subClusterId, amRMServiceAddress,
+            addressNull, rmAdminServiceAddress, rmWebServiceAddress,
+            lastHeartBeat, stateNew, lastStartTime, capability);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(e.getMessage()
+          .startsWith("Missing SubCluster Endpoint information."));
+    }
+
+    // Execution with Empty Address for clientRMServiceAddress
+
+    subClusterInfo =
+        SubClusterInfo.newInstance(subClusterId, amRMServiceAddress,
+            addressEmpty, rmAdminServiceAddress, rmWebServiceAddress,
+            lastHeartBeat, stateNew, lastStartTime, capability);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(e.getMessage()
+          .startsWith("Missing SubCluster Endpoint information."));
+    }
+
+    // Execution with Null Address for rmAdminServiceAddress
+
+    subClusterInfo =
+        SubClusterInfo.newInstance(subClusterId, amRMServiceAddress,
+            clientRMServiceAddress, addressNull, rmWebServiceAddress,
+            lastHeartBeat, stateNew, lastStartTime, capability);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(e.getMessage()
+          .startsWith("Missing SubCluster Endpoint information."));
+    }
+
+    // Execution with Empty Address for rmAdminServiceAddress
+
+    subClusterInfo =
+        SubClusterInfo.newInstance(subClusterId, amRMServiceAddress,
+            clientRMServiceAddress, addressEmpty, rmWebServiceAddress,
+            lastHeartBeat, stateNew, lastStartTime, capability);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(e.getMessage()
+          .startsWith("Missing SubCluster Endpoint information."));
+    }
+
+    // Execution with Null Address for rmWebServiceAddress
+
+    subClusterInfo = SubClusterInfo.newInstance(subClusterId,
+        amRMServiceAddress, clientRMServiceAddress, rmAdminServiceAddress,
+        addressNull, lastHeartBeat, stateNew, lastStartTime, capability);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(e.getMessage()
+          .startsWith("Missing SubCluster Endpoint information."));
+    }
+
+    // Execution with Empty Address for rmWebServiceAddress
+
+    subClusterInfo = SubClusterInfo.newInstance(subClusterId,
+        amRMServiceAddress, clientRMServiceAddress, rmAdminServiceAddress,
+        addressEmpty, lastHeartBeat, stateNew, lastStartTime, capability);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(e.getMessage()
+          .startsWith("Missing SubCluster Endpoint information."));
+    }
+  }
+
+  @Test
+  public void testValidateSubClusterRegisterRequestAddressInvalid() {
+
+    // Address is not in host:port format for amRMService
+
+    SubClusterInfo subClusterInfo =
+        SubClusterInfo.newInstance(subClusterId, addressWrong,
+            clientRMServiceAddress, rmAdminServiceAddress, rmWebServiceAddress,
+            lastHeartBeat, stateNull, lastStartTime, capability);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(e.getMessage().contains("valid host:port authority:"));
+    }
+
+    // Address is not in host:port format for clientRMService
+
+    subClusterInfo =
+        SubClusterInfo.newInstance(subClusterId, amRMServiceAddress,
+            addressWrong, rmAdminServiceAddress, rmWebServiceAddress,
+            lastHeartBeat, stateNull, lastStartTime, capability);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(e.getMessage().contains("valid host:port authority:"));
+    }
+
+    // Address is not in host:port format for rmAdminService
+
+    subClusterInfo =
+        SubClusterInfo.newInstance(subClusterId, amRMServiceAddress,
+            clientRMServiceAddress, addressWrong, rmWebServiceAddress,
+            lastHeartBeat, stateNull, lastStartTime, capability);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(e.getMessage().contains("valid host:port authority:"));
+    }
+
+    // Address is not in host:port format for rmWebService
+
+    subClusterInfo = SubClusterInfo.newInstance(subClusterId,
+        amRMServiceAddress, clientRMServiceAddress, rmAdminServiceAddress,
+        addressWrong, lastHeartBeat, stateNull, lastStartTime, capability);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(e.getMessage().contains("valid host:port authority:"));
+    }
+
+    // Port is not an integer for amRMService
+
+    subClusterInfo = SubClusterInfo.newInstance(subClusterId, addressWrongPort,
+        clientRMServiceAddress, rmAdminServiceAddress, rmWebServiceAddress,
+        lastHeartBeat, stateNull, lastStartTime, capability);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(e.getMessage().contains("valid host:port authority:"));
+    }
+
+    // Port is not an integer for clientRMService
+
+    subClusterInfo =
+        SubClusterInfo.newInstance(subClusterId, amRMServiceAddress,
+            addressWrongPort, rmAdminServiceAddress, rmWebServiceAddress,
+            lastHeartBeat, stateNull, lastStartTime, capability);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(e.getMessage().contains("valid host:port authority:"));
+    }
+
+    // Port is not an integer for rmAdminService
+
+    subClusterInfo =
+        SubClusterInfo.newInstance(subClusterId, amRMServiceAddress,
+            clientRMServiceAddress, addressWrongPort, rmWebServiceAddress,
+            lastHeartBeat, stateNull, lastStartTime, capability);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(e.getMessage().contains("valid host:port authority:"));
+    }
+
+    // Port is not an integer for rmWebService
+
+    subClusterInfo = SubClusterInfo.newInstance(subClusterId,
+        amRMServiceAddress, clientRMServiceAddress, rmAdminServiceAddress,
+        addressWrongPort, lastHeartBeat, stateNull, lastStartTime, capability);
+    try {
+      SubClusterRegisterRequest request =
+          SubClusterRegisterRequest.newInstance(subClusterInfo);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterRegisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(e.getMessage().contains("valid host:port authority:"));
+    }
+
+  }
+
+  @Test
+  public void testValidateSubClusterDeregisterRequest() {
+
+    // Execution with valid inputs
+
+    try {
+      SubClusterDeregisterRequest request =
+          SubClusterDeregisterRequest.newInstance(subClusterId, stateLost);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterDeregisterRequest(request);
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.fail(e.getMessage());
+    }
+
+    // Execution with null request
+
+    try {
+      SubClusterDeregisterRequest request = null;
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterDeregisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Missing SubClusterDeregister Request."));
+    }
+
+    // Execution with null SubClusterId
+
+    try {
+      SubClusterDeregisterRequest request =
+          SubClusterDeregisterRequest.newInstance(subClusterIdNull, stateLost);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterDeregisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Missing SubCluster Id information."));
+    }
+
+    // Execution with invalid SubClusterId
+
+    try {
+      SubClusterDeregisterRequest request = SubClusterDeregisterRequest
+          .newInstance(subClusterIdInvalid, stateLost);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterDeregisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Invalid SubCluster Id information."));
+    }
+
+    // Execution with null SubClusterState
+
+    try {
+      SubClusterDeregisterRequest request =
+          SubClusterDeregisterRequest.newInstance(subClusterId, stateNull);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterDeregisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Missing SubCluster State information."));
+    }
+
+    // Execution with invalid SubClusterState
+
+    try {
+      SubClusterDeregisterRequest request =
+          SubClusterDeregisterRequest.newInstance(subClusterId, stateNew);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterDeregisterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(e.getMessage().startsWith("Invalid non-final state: "));
+    }
+  }
+
+  @Test
+  public void testSubClusterHeartbeatRequest() {
+
+    // Execution with valid inputs
+
+    try {
+      SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest
+          .newInstance(subClusterId, lastHeartBeat, stateLost, capability);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterHeartbeatRequest(request);
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.fail(e.getMessage());
+    }
+
+    // Execution with null request
+
+    try {
+      SubClusterHeartbeatRequest request = null;
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterHeartbeatRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Missing SubClusterHeartbeat Request."));
+    }
+
+    // Execution with null SubClusterId
+
+    try {
+      SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest
+          .newInstance(subClusterIdNull, lastHeartBeat, stateLost, capability);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterHeartbeatRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Missing SubCluster Id information."));
+    }
+
+    // Execution with invalid SubClusterId
+
+    try {
+      SubClusterHeartbeatRequest request =
+          SubClusterHeartbeatRequest.newInstance(subClusterIdInvalid,
+              lastHeartBeat, stateLost, capability);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterHeartbeatRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Invalid SubCluster Id information."));
+    }
+
+    // Execution with null SubClusterState
+
+    try {
+      SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest
+          .newInstance(subClusterId, lastHeartBeat, stateNull, capability);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterHeartbeatRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Missing SubCluster State information."));
+    }
+
+    // Execution with negative Last Heartbeat
+
+    try {
+      SubClusterHeartbeatRequest request =
+          SubClusterHeartbeatRequest.newInstance(subClusterId,
+              lastHeartBeatNegative, stateLost, capability);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterHeartbeatRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Invalid timestamp information."));
+    }
+
+    // Execution with null Capability
+
+    try {
+      SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest
+          .newInstance(subClusterId, lastHeartBeat, stateLost, capabilityNull);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterHeartbeatRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Invalid capability information."));
+    }
+
+    // Execution with empty Capability
+
+    try {
+      SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest
+          .newInstance(subClusterId, lastHeartBeat, stateLost, capabilityEmpty);
+      FederationMembershipStateStoreInputValidator
+          .validateSubClusterHeartbeatRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Invalid capability information."));
+    }
+  }
+
+  @Test
+  public void testGetSubClusterInfoRequest() {
+
+    // Execution with valid inputs
+
+    try {
+      GetSubClusterInfoRequest request =
+          GetSubClusterInfoRequest.newInstance(subClusterId);
+      FederationMembershipStateStoreInputValidator
+          .validateGetSubClusterInfoRequest(request);
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.fail(e.getMessage());
+    }
+
+    // Execution with null request
+
+    try {
+      GetSubClusterInfoRequest request = null;
+      FederationMembershipStateStoreInputValidator
+          .validateGetSubClusterInfoRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Missing GetSubClusterInfo Request."));
+    }
+
+    // Execution with null SubClusterId
+
+    try {
+      GetSubClusterInfoRequest request =
+          GetSubClusterInfoRequest.newInstance(subClusterIdNull);
+      FederationMembershipStateStoreInputValidator
+          .validateGetSubClusterInfoRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Missing SubCluster Id information."));
+    }
+
+    // Execution with invalid SubClusterId
+
+    try {
+      GetSubClusterInfoRequest request =
+          GetSubClusterInfoRequest.newInstance(subClusterIdInvalid);
+      FederationMembershipStateStoreInputValidator
+          .validateGetSubClusterInfoRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Invalid SubCluster Id information."));
+    }
+  }
+
+  @Test
+  public void testAddApplicationHomeSubClusterRequest() {
+
+    // Execution with valid inputs
+
+    ApplicationHomeSubCluster applicationHomeSubCluster =
+        ApplicationHomeSubCluster.newInstance(appId, subClusterId);
+    try {
+      AddApplicationHomeSubClusterRequest request =
+          AddApplicationHomeSubClusterRequest
+              .newInstance(applicationHomeSubCluster);
+      FederationApplicationHomeSubClusterStoreInputValidator
+          .validateAddApplicationHomeSubClusterRequest(request);
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.fail(e.getMessage());
+    }
+
+    // Execution with null request
+
+    try {
+      AddApplicationHomeSubClusterRequest request = null;
+      FederationApplicationHomeSubClusterStoreInputValidator
+          .validateAddApplicationHomeSubClusterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.assertTrue(e.getMessage()
+          .startsWith("Missing AddApplicationHomeSubCluster Request."));
+    }
+
+    // Execution with null ApplicationHomeSubCluster
+
+    applicationHomeSubCluster = null;
+    try {
+      AddApplicationHomeSubClusterRequest request =
+          AddApplicationHomeSubClusterRequest
+              .newInstance(applicationHomeSubCluster);
+      FederationApplicationHomeSubClusterStoreInputValidator
+          .validateAddApplicationHomeSubClusterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.assertTrue(
+          e.getMessage().startsWith("Missing ApplicationHomeSubCluster Info."));
+    }
+
+    // Execution with null SubClusterId
+
+    applicationHomeSubCluster =
+        ApplicationHomeSubCluster.newInstance(appId, subClusterIdNull);
+    try {
+      AddApplicationHomeSubClusterRequest request =
+          AddApplicationHomeSubClusterRequest
+              .newInstance(applicationHomeSubCluster);
+      FederationApplicationHomeSubClusterStoreInputValidator
+          .validateAddApplicationHomeSubClusterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Missing SubCluster Id information."));
+    }
+
+    // Execution with invalid SubClusterId
+
+    applicationHomeSubCluster =
+        ApplicationHomeSubCluster.newInstance(appId, subClusterIdInvalid);
+    try {
+      AddApplicationHomeSubClusterRequest request =
+          AddApplicationHomeSubClusterRequest
+              .newInstance(applicationHomeSubCluster);
+      FederationApplicationHomeSubClusterStoreInputValidator
+          .validateAddApplicationHomeSubClusterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Invalid SubCluster Id information."));
+    }
+
+    // Execution with Null ApplicationId
+
+    applicationHomeSubCluster =
+        ApplicationHomeSubCluster.newInstance(appIdNull, subClusterId);
+    try {
+      AddApplicationHomeSubClusterRequest request =
+          AddApplicationHomeSubClusterRequest
+              .newInstance(applicationHomeSubCluster);
+      FederationApplicationHomeSubClusterStoreInputValidator
+          .validateAddApplicationHomeSubClusterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.assertTrue(e.getMessage().startsWith("Missing Application Id."));
+    }
+  }
+
+  @Test
+  public void testUpdateApplicationHomeSubClusterRequest() {
+
+    // Execution with valid inputs
+
+    ApplicationHomeSubCluster applicationHomeSubCluster =
+        ApplicationHomeSubCluster.newInstance(appId, subClusterId);
+    try {
+      UpdateApplicationHomeSubClusterRequest request =
+          UpdateApplicationHomeSubClusterRequest
+              .newInstance(applicationHomeSubCluster);
+      FederationApplicationHomeSubClusterStoreInputValidator
+          .validateUpdateApplicationHomeSubClusterRequest(request);
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.fail(e.getMessage());
+    }
+
+    // Execution with null request
+
+    try {
+      UpdateApplicationHomeSubClusterRequest request = null;
+      FederationApplicationHomeSubClusterStoreInputValidator
+          .validateUpdateApplicationHomeSubClusterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.assertTrue(e.getMessage()
+          .startsWith("Missing UpdateApplicationHomeSubCluster Request."));
+    }
+
+    // Execution with null ApplicationHomeSubCluster
+
+    applicationHomeSubCluster = null;
+    try {
+      UpdateApplicationHomeSubClusterRequest request =
+          UpdateApplicationHomeSubClusterRequest
+              .newInstance(applicationHomeSubCluster);
+      FederationApplicationHomeSubClusterStoreInputValidator
+          .validateUpdateApplicationHomeSubClusterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.assertTrue(
+          e.getMessage().startsWith("Missing ApplicationHomeSubCluster Info."));
+    }
+
+    // Execution with null SubClusteId
+
+    applicationHomeSubCluster =
+        ApplicationHomeSubCluster.newInstance(appId, subClusterIdNull);
+    try {
+      UpdateApplicationHomeSubClusterRequest request =
+          UpdateApplicationHomeSubClusterRequest
+              .newInstance(applicationHomeSubCluster);
+      FederationApplicationHomeSubClusterStoreInputValidator
+          .validateUpdateApplicationHomeSubClusterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Missing SubCluster Id information."));
+    }
+
+    // Execution with invalid SubClusterId
+
+    applicationHomeSubCluster =
+        ApplicationHomeSubCluster.newInstance(appId, subClusterIdInvalid);
+    try {
+      UpdateApplicationHomeSubClusterRequest request =
+          UpdateApplicationHomeSubClusterRequest
+              .newInstance(applicationHomeSubCluster);
+      FederationApplicationHomeSubClusterStoreInputValidator
+          .validateUpdateApplicationHomeSubClusterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      LOG.info(e.getMessage());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Invalid SubCluster Id information."));
+    }
+
+    // Execution with null ApplicationId
+
+    applicationHomeSubCluster =
+        ApplicationHomeSubCluster.newInstance(appIdNull, subClusterId);
+    try {
+      UpdateApplicationHomeSubClusterRequest request =
+          UpdateApplicationHomeSubClusterRequest
+              .newInstance(applicationHomeSubCluster);
+      FederationApplicationHomeSubClusterStoreInputValidator
+          .validateUpdateApplicationHomeSubClusterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.assertTrue(e.getMessage().startsWith("Missing Application Id."));
+    }
+  }
+
+  @Test
+  public void testGetApplicationHomeSubClusterRequest() {
+
+    // Execution with valid inputs
+
+    try {
+      GetApplicationHomeSubClusterRequest request =
+          GetApplicationHomeSubClusterRequest.newInstance(appId);
+      FederationApplicationHomeSubClusterStoreInputValidator
+          .validateGetApplicationHomeSubClusterRequest(request);
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.fail(e.getMessage());
+    }
+
+    // Execution with null request
+
+    try {
+      GetApplicationHomeSubClusterRequest request = null;
+      FederationApplicationHomeSubClusterStoreInputValidator
+          .validateGetApplicationHomeSubClusterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.assertTrue(e.getMessage()
+          .startsWith("Missing GetApplicationHomeSubCluster Request."));
+    }
+
+    // Execution with null ApplicationId
+
+    try {
+      GetApplicationHomeSubClusterRequest request =
+          GetApplicationHomeSubClusterRequest.newInstance(appIdNull);
+      FederationApplicationHomeSubClusterStoreInputValidator
+          .validateGetApplicationHomeSubClusterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.assertTrue(e.getMessage().startsWith("Missing Application Id."));
+    }
+
+  }
+
+  @Test
+  public void testDeleteApplicationHomeSubClusterRequestNull() {
+
+    // Execution with valid inputs
+
+    try {
+      DeleteApplicationHomeSubClusterRequest request =
+          DeleteApplicationHomeSubClusterRequest.newInstance(appId);
+      FederationApplicationHomeSubClusterStoreInputValidator
+          .validateDeleteApplicationHomeSubClusterRequest(request);
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.fail(e.getMessage());
+    }
+
+    // Execution with null request
+
+    try {
+      DeleteApplicationHomeSubClusterRequest request = null;
+      FederationApplicationHomeSubClusterStoreInputValidator
+          .validateDeleteApplicationHomeSubClusterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.assertTrue(e.getMessage()
+          .startsWith("Missing DeleteApplicationHomeSubCluster Request."));
+    }
+
+    // Execution with null ApplicationId
+
+    try {
+      DeleteApplicationHomeSubClusterRequest request =
+          DeleteApplicationHomeSubClusterRequest.newInstance(appIdNull);
+      FederationApplicationHomeSubClusterStoreInputValidator
+          .validateDeleteApplicationHomeSubClusterRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.assertTrue(e.getMessage().startsWith("Missing Application Id."));
+    }
+
+  }
+
+  @Test
+  public void testGetSubClusterPolicyConfigurationRequest() {
+
+    // Execution with valid inputs
+
+    try {
+      GetSubClusterPolicyConfigurationRequest request =
+          GetSubClusterPolicyConfigurationRequest.newInstance(queue);
+      FederationPolicyStoreInputValidator
+          .validateGetSubClusterPolicyConfigurationRequest(request);
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.fail(e.getMessage());
+    }
+
+    // Execution with null request
+
+    try {
+      GetSubClusterPolicyConfigurationRequest request = null;
+      FederationPolicyStoreInputValidator
+          .validateGetSubClusterPolicyConfigurationRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.assertTrue(e.getMessage()
+          .startsWith("Missing GetSubClusterPolicyConfiguration Request."));
+    }
+
+    // Execution with null queue id
+
+    try {
+      GetSubClusterPolicyConfigurationRequest request =
+          GetSubClusterPolicyConfigurationRequest.newInstance(queueNull);
+      FederationPolicyStoreInputValidator
+          .validateGetSubClusterPolicyConfigurationRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.assertTrue(e.getMessage().startsWith("Missing Queue."));
+    }
+
+    // Execution with empty queue id
+
+    try {
+      GetSubClusterPolicyConfigurationRequest request =
+          GetSubClusterPolicyConfigurationRequest.newInstance(queueEmpty);
+      FederationPolicyStoreInputValidator
+          .validateGetSubClusterPolicyConfigurationRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.assertTrue(e.getMessage().startsWith("Missing Queue."));
+    }
+
+  }
+
+  @Test
+  public void testSetSubClusterPolicyConfigurationRequest() {
+
+    // Execution with valid inputs
+
+    try {
+      SubClusterPolicyConfiguration policy =
+          SubClusterPolicyConfiguration.newInstance(queue, type, params);
+      SetSubClusterPolicyConfigurationRequest request =
+          SetSubClusterPolicyConfigurationRequest.newInstance(policy);
+      FederationPolicyStoreInputValidator
+          .validateSetSubClusterPolicyConfigurationRequest(request);
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.fail(e.getMessage());
+    }
+
+    // Execution with null request
+
+    try {
+      SetSubClusterPolicyConfigurationRequest request = null;
+      FederationPolicyStoreInputValidator
+          .validateSetSubClusterPolicyConfigurationRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.assertTrue(e.getMessage()
+          .startsWith("Missing SetSubClusterPolicyConfiguration Request."));
+    }
+
+    // Execution with null SubClusterPolicyConfiguration
+
+    try {
+      SubClusterPolicyConfiguration policy = null;
+      SetSubClusterPolicyConfigurationRequest request =
+          SetSubClusterPolicyConfigurationRequest.newInstance(policy);
+      FederationPolicyStoreInputValidator
+          .validateSetSubClusterPolicyConfigurationRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.assertTrue(
+          e.getMessage().startsWith("Missing SubClusterPolicyConfiguration."));
+    }
+
+    // Execution with null queue id
+
+    try {
+      SubClusterPolicyConfiguration policy =
+          SubClusterPolicyConfiguration.newInstance(queueNull, type, params);
+      SetSubClusterPolicyConfigurationRequest request =
+          SetSubClusterPolicyConfigurationRequest.newInstance(policy);
+      FederationPolicyStoreInputValidator
+          .validateSetSubClusterPolicyConfigurationRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.assertTrue(e.getMessage().startsWith("Missing Queue."));
+    }
+
+    // Execution with empty queue id
+
+    try {
+      SubClusterPolicyConfiguration policy =
+          SubClusterPolicyConfiguration.newInstance(queueEmpty, type, params);
+      SetSubClusterPolicyConfigurationRequest request =
+          SetSubClusterPolicyConfigurationRequest.newInstance(policy);
+      FederationPolicyStoreInputValidator
+          .validateSetSubClusterPolicyConfigurationRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.assertTrue(e.getMessage().startsWith("Missing Queue."));
+    }
+
+    // Execution with null policy type
+
+    try {
+      SubClusterPolicyConfiguration policy =
+          SubClusterPolicyConfiguration.newInstance(queue, typeNull, params);
+      SetSubClusterPolicyConfigurationRequest request =
+          SetSubClusterPolicyConfigurationRequest.newInstance(policy);
+      FederationPolicyStoreInputValidator
+          .validateSetSubClusterPolicyConfigurationRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.assertTrue(e.getMessage().startsWith("Missing Policy Type."));
+    }
+
+    // Execution with empty policy type
+
+    try {
+      SubClusterPolicyConfiguration policy =
+          SubClusterPolicyConfiguration.newInstance(queue, typeEmpty, params);
+      SetSubClusterPolicyConfigurationRequest request =
+          SetSubClusterPolicyConfigurationRequest.newInstance(policy);
+      FederationPolicyStoreInputValidator
+          .validateSetSubClusterPolicyConfigurationRequest(request);
+      Assert.fail();
+    } catch (FederationStateStoreInvalidInputException e) {
+      Assert.assertTrue(e.getMessage().startsWith("Missing Policy Type."));
+    }
+  }
+
+}