Browse Source

YARN-3663. Federation State and Policy Store (DBMS implementation). (Giovanni Matteo Fumarola via curino).

Carlo Curino 8 years ago
parent
commit
be99c1fe2e
22 changed files with 2228 additions and 308 deletions
  1. 1 0
      LICENSE.txt
  2. 12 0
      hadoop-project/pom.xml
  3. 23 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  4. 14 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
  5. 20 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
  6. 0 105
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreErrorCode.java
  7. 11 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreException.java
  8. 34 47
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
  9. 937 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
  10. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterState.java
  11. 4 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java
  12. 5 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
  13. 2 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java
  14. 84 25
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java
  15. 49 25
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
  16. 252 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java
  17. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
  18. 49 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java
  19. 73 73
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java
  20. 2 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java
  21. 511 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql
  22. 122 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreTables.sql

+ 1 - 0
LICENSE.txt

@@ -698,6 +698,7 @@ hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/static/jquery-1.10.2.min.js
 hadoop-tools/hadoop-sls/src/main/html/js/thirdparty/jquery.js
 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/webapps/static/jquery
 Apache HBase - Server which contains JQuery minified javascript library version 1.8.3
+Microsoft SQLServer - JDBC version 6.1.0.jre7
 --------------------------------------------------------------------------------
 
 Copyright 2005, 2012, 2013 jQuery Foundation and other contributors, https://jquery.org/

+ 12 - 0
hadoop-project/pom.xml

@@ -100,6 +100,8 @@
 
     <jcache.version>1.0.0</jcache.version>
     <ehcache.version>3.0.3</ehcache.version>
+    <hikari.version>2.4.11</hikari.version>
+    <mssql.version>6.1.0.jre7</mssql.version>
 
     <!-- define the Java language version used by the compiler -->
     <javac.version>1.8</javac.version>
@@ -1284,6 +1286,16 @@
           <artifactId>ehcache</artifactId>
           <version>${ehcache.version}</version>
         </dependency>
+        <dependency>
+          <groupId>com.zaxxer</groupId>
+          <artifactId>HikariCP-java7</artifactId>
+          <version>${hikari.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>com.microsoft.sqlserver</groupId>
+          <artifactId>mssql-jdbc</artifactId>
+          <version>${mssql.version}</version>
+        </dependency>
     </dependencies>
   </dependencyManagement>
 

+ 23 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -2614,6 +2614,29 @@ public class YarnConfiguration extends Configuration {
 
   public static final String DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS = "";
 
+  private static final String FEDERATION_STATESTORE_SQL_PREFIX =
+      FEDERATION_PREFIX + "state-store.sql.";
+
+  public static final String FEDERATION_STATESTORE_SQL_USERNAME =
+      FEDERATION_STATESTORE_SQL_PREFIX + "username";
+
+  public static final String FEDERATION_STATESTORE_SQL_PASSWORD =
+      FEDERATION_STATESTORE_SQL_PREFIX + "password";
+
+  public static final String FEDERATION_STATESTORE_SQL_URL =
+      FEDERATION_STATESTORE_SQL_PREFIX + "url";
+
+  public static final String FEDERATION_STATESTORE_SQL_JDBC_CLASS =
+      FEDERATION_STATESTORE_SQL_PREFIX + "jdbc-class";
+
+  public static final String DEFAULT_FEDERATION_STATESTORE_SQL_JDBC_CLASS =
+      "org.hsqldb.jdbc.JDBCDataSource";
+
+  public static final String FEDERATION_STATESTORE_SQL_MAXCONNECTIONS =
+      FEDERATION_STATESTORE_SQL_PREFIX + "max-connections";
+
+  public static final int DEFAULT_FEDERATION_STATESTORE_SQL_MAXCONNECTIONS = 1;
+
   ////////////////////////////////
   // Other Configs
   ////////////////////////////////

+ 14 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java

@@ -90,6 +90,20 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
     configurationPropsToSkipCompare
         .add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS);
 
+    // Federation StateStore SQL implementation configs to be ignored
+    configurationPropsToSkipCompare
+        .add(YarnConfiguration.FEDERATION_STATESTORE_SQL_JDBC_CLASS);
+    configurationPropsToSkipCompare
+        .add(YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_SQL_JDBC_CLASS);
+    configurationPropsToSkipCompare
+        .add(YarnConfiguration.FEDERATION_STATESTORE_SQL_USERNAME);
+    configurationPropsToSkipCompare
+        .add(YarnConfiguration.FEDERATION_STATESTORE_SQL_PASSWORD);
+    configurationPropsToSkipCompare
+        .add(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL);
+    configurationPropsToSkipCompare
+        .add(YarnConfiguration.FEDERATION_STATESTORE_SQL_MAXCONNECTIONS);
+
     // Ignore blacklisting nodes for AM failures feature since it is still a
     // "work in progress"
     configurationPropsToSkipCompare.add(YarnConfiguration.

+ 20 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml

@@ -110,6 +110,26 @@
       <groupId>org.ehcache</groupId>
       <artifactId>ehcache</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.zaxxer</groupId>
+      <artifactId>HikariCP-java7</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.hsqldb</groupId>
+      <artifactId>hsqldb</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.microsoft.sqlserver</groupId>
+      <artifactId>mssql-jdbc</artifactId>
+      <scope>runtime</scope>
+       <exclusions>
+        <exclusion>
+          <groupId>com.microsoft.azure</groupId>
+          <artifactId>azure-keyvault</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
   <build>

+ 0 - 105
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreErrorCode.java

@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.federation.store.exception;
-
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-
-/**
- * <p>
- * Logical error codes from <code>FederationStateStore</code>.
- * </p>
- */
-@Public
-@Unstable
-public enum FederationStateStoreErrorCode {
-
-  MEMBERSHIP_INSERT_FAIL(1101, "Fail to insert a tuple into Membership table."),
-
-  MEMBERSHIP_DELETE_FAIL(1102, "Fail to delete a tuple from Membership table."),
-
-  MEMBERSHIP_SINGLE_SELECT_FAIL(1103,
-      "Fail to select a tuple from Membership table."),
-
-  MEMBERSHIP_MULTIPLE_SELECT_FAIL(1104,
-      "Fail to select multiple tuples from Membership table."),
-
-  MEMBERSHIP_UPDATE_DEREGISTER_FAIL(1105,
-      "Fail to update/deregister a tuple in Membership table."),
-
-  MEMBERSHIP_UPDATE_HEARTBEAT_FAIL(1106,
-      "Fail to update/heartbeat a tuple in Membership table."),
-
-  APPLICATIONS_INSERT_FAIL(1201,
-      "Fail to insert a tuple into ApplicationsHomeSubCluster table."),
-
-  APPLICATIONS_DELETE_FAIL(1202,
-      "Fail to delete a tuple from ApplicationsHomeSubCluster table"),
-
-  APPLICATIONS_SINGLE_SELECT_FAIL(1203,
-      "Fail to select a tuple from ApplicationsHomeSubCluster table."),
-
-  APPLICATIONS_MULTIPLE_SELECT_FAIL(1204,
-      "Fail to select multiple tuple from ApplicationsHomeSubCluster table."),
-
-  APPLICATIONS_UPDATE_FAIL(1205,
-      "Fail to update a tuple in ApplicationsHomeSubCluster table."),
-
-  POLICY_INSERT_FAIL(1301, "Fail to insert a tuple into Policy table."),
-
-  POLICY_DELETE_FAIL(1302, "Fail to delete a tuple from Membership table."),
-
-  POLICY_SINGLE_SELECT_FAIL(1303, "Fail to select a tuple from Policy table."),
-
-  POLICY_MULTIPLE_SELECT_FAIL(1304,
-      "Fail to select multiple tuples from Policy table."),
-
-  POLICY_UPDATE_FAIL(1305, "Fail to update a tuple in Policy table.");
-
-  private final int id;
-  private final String msg;
-
-  FederationStateStoreErrorCode(int id, String msg) {
-    this.id = id;
-    this.msg = msg;
-  }
-
-  /**
-   * Get the error code related to the FederationStateStore failure.
-   *
-   * @return the error code related to the FederationStateStore failure.
-   */
-  public int getId() {
-    return this.id;
-  }
-
-  /**
-   * Get the error message related to the FederationStateStore failure.
-   *
-   * @return the error message related to the FederationStateStore failure.
-   */
-  public String getMsg() {
-    return this.msg;
-  }
-
-  @Override
-  public String toString() {
-    return "\nError Code: " + this.id + "\nError Message: " + this.msg;
-  }
-}

+ 11 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/exception/FederationStateStoreException.java

@@ -31,15 +31,20 @@ public class FederationStateStoreException extends YarnException {
    */
   private static final long serialVersionUID = -6453353714832159296L;
 
-  private FederationStateStoreErrorCode code;
-
-  public FederationStateStoreException(FederationStateStoreErrorCode code) {
+  public FederationStateStoreException() {
     super();
-    this.code = code;
   }
 
-  public FederationStateStoreErrorCode getCode() {
-    return code;
+  public FederationStateStoreException(String message) {
+    super(message);
+  }
+
+  public FederationStateStoreException(Throwable cause) {
+    super(cause);
+  }
+
+  public FederationStateStoreException(String message, Throwable cause) {
+    super(message, cause);
   }
 
 }

+ 34 - 47
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java

@@ -18,21 +18,17 @@
 package org.apache.hadoop.yarn.server.federation.store.impl;
 
 import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.TimeZone;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
 import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
-import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
 import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
 import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
@@ -52,8 +48,13 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfo
 import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
 import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
 import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
@@ -98,16 +99,18 @@ public class MemoryFederationStateStore implements FederationStateStore {
   @Override
   public SubClusterRegisterResponse registerSubCluster(
       SubClusterRegisterRequest request) throws YarnException {
-    FederationMembershipStateStoreInputValidator
-        .validateSubClusterRegisterRequest(request);
+    FederationMembershipStateStoreInputValidator.validate(request);
     SubClusterInfo subClusterInfo = request.getSubClusterInfo();
 
+    long currentTime =
+        Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
+
     SubClusterInfo subClusterInfoToSave =
         SubClusterInfo.newInstance(subClusterInfo.getSubClusterId(),
             subClusterInfo.getAMRMServiceAddress(),
             subClusterInfo.getClientRMServiceAddress(),
             subClusterInfo.getRMAdminServiceAddress(),
-            subClusterInfo.getRMWebServiceAddress(), clock.getTime(),
+            subClusterInfo.getRMWebServiceAddress(), currentTime,
             subClusterInfo.getState(), subClusterInfo.getLastStartTime(),
             subClusterInfo.getCapability());
 
@@ -118,15 +121,12 @@ public class MemoryFederationStateStore implements FederationStateStore {
   @Override
   public SubClusterDeregisterResponse deregisterSubCluster(
       SubClusterDeregisterRequest request) throws YarnException {
-    FederationMembershipStateStoreInputValidator
-        .validateSubClusterDeregisterRequest(request);
+    FederationMembershipStateStoreInputValidator.validate(request);
     SubClusterInfo subClusterInfo = membership.get(request.getSubClusterId());
     if (subClusterInfo == null) {
       String errMsg =
           "SubCluster " + request.getSubClusterId().toString() + " not found";
-      FederationStateStoreUtils.logAndThrowStoreException(LOG,
-          FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_DEREGISTER_FAIL,
-          errMsg);
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
     } else {
       subClusterInfo.setState(request.getState());
     }
@@ -138,20 +138,20 @@ public class MemoryFederationStateStore implements FederationStateStore {
   public SubClusterHeartbeatResponse subClusterHeartbeat(
       SubClusterHeartbeatRequest request) throws YarnException {
 
-    FederationMembershipStateStoreInputValidator
-        .validateSubClusterHeartbeatRequest(request);
+    FederationMembershipStateStoreInputValidator.validate(request);
     SubClusterId subClusterId = request.getSubClusterId();
     SubClusterInfo subClusterInfo = membership.get(subClusterId);
 
     if (subClusterInfo == null) {
-      String errMsg = "Subcluster " + subClusterId.toString()
+      String errMsg = "SubCluster " + subClusterId.toString()
           + " does not exist; cannot heartbeat";
-      FederationStateStoreUtils.logAndThrowStoreException(LOG,
-          FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_HEARTBEAT_FAIL,
-          errMsg);
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
     }
 
-    subClusterInfo.setLastHeartBeat(clock.getTime());
+    long currentTime =
+        Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
+
+    subClusterInfo.setLastHeartBeat(currentTime);
     subClusterInfo.setState(request.getState());
     subClusterInfo.setCapability(request.getCapability());
 
@@ -162,14 +162,12 @@ public class MemoryFederationStateStore implements FederationStateStore {
   public GetSubClusterInfoResponse getSubCluster(
       GetSubClusterInfoRequest request) throws YarnException {
 
-    FederationMembershipStateStoreInputValidator
-        .validateGetSubClusterInfoRequest(request);
+    FederationMembershipStateStoreInputValidator.validate(request);
     SubClusterId subClusterId = request.getSubClusterId();
     if (!membership.containsKey(subClusterId)) {
       String errMsg =
-          "Subcluster " + subClusterId.toString() + " does not exist";
-      FederationStateStoreUtils.logAndThrowStoreException(LOG,
-          FederationStateStoreErrorCode.MEMBERSHIP_SINGLE_SELECT_FAIL, errMsg);
+          "SubCluster " + subClusterId.toString() + " does not exist";
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
     }
 
     return GetSubClusterInfoResponse.newInstance(membership.get(subClusterId));
@@ -195,8 +193,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
   public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
       AddApplicationHomeSubClusterRequest request) throws YarnException {
 
-    FederationApplicationHomeSubClusterStoreInputValidator
-        .validateAddApplicationHomeSubClusterRequest(request);
+    FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
     ApplicationId appId =
         request.getApplicationHomeSubCluster().getApplicationId();
 
@@ -213,14 +210,12 @@ public class MemoryFederationStateStore implements FederationStateStore {
   public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
       UpdateApplicationHomeSubClusterRequest request) throws YarnException {
 
-    FederationApplicationHomeSubClusterStoreInputValidator
-        .validateUpdateApplicationHomeSubClusterRequest(request);
+    FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
     ApplicationId appId =
         request.getApplicationHomeSubCluster().getApplicationId();
     if (!applications.containsKey(appId)) {
       String errMsg = "Application " + appId + " does not exist";
-      FederationStateStoreUtils.logAndThrowStoreException(LOG,
-          FederationStateStoreErrorCode.APPLICATIONS_UPDATE_FAIL, errMsg);
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
     }
 
     applications.put(appId,
@@ -232,14 +227,11 @@ public class MemoryFederationStateStore implements FederationStateStore {
   public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
       GetApplicationHomeSubClusterRequest request) throws YarnException {
 
-    FederationApplicationHomeSubClusterStoreInputValidator
-        .validateGetApplicationHomeSubClusterRequest(request);
+    FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
     ApplicationId appId = request.getApplicationId();
     if (!applications.containsKey(appId)) {
       String errMsg = "Application " + appId + " does not exist";
-      FederationStateStoreUtils.logAndThrowStoreException(LOG,
-          FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL,
-          errMsg);
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
     }
 
     return GetApplicationHomeSubClusterResponse.newInstance(
@@ -264,13 +256,11 @@ public class MemoryFederationStateStore implements FederationStateStore {
   public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
       DeleteApplicationHomeSubClusterRequest request) throws YarnException {
 
-    FederationApplicationHomeSubClusterStoreInputValidator
-        .validateDeleteApplicationHomeSubClusterRequest(request);
+    FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
     ApplicationId appId = request.getApplicationId();
     if (!applications.containsKey(appId)) {
       String errMsg = "Application " + appId + " does not exist";
-      FederationStateStoreUtils.logAndThrowStoreException(LOG,
-          FederationStateStoreErrorCode.APPLICATIONS_DELETE_FAIL, errMsg);
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
     }
 
     applications.remove(appId);
@@ -281,13 +271,11 @@ public class MemoryFederationStateStore implements FederationStateStore {
   public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
       GetSubClusterPolicyConfigurationRequest request) throws YarnException {
 
-    FederationPolicyStoreInputValidator
-        .validateGetSubClusterPolicyConfigurationRequest(request);
+    FederationPolicyStoreInputValidator.validate(request);
     String queue = request.getQueue();
     if (!policies.containsKey(queue)) {
       String errMsg = "Policy for queue " + queue + " does not exist";
-      FederationStateStoreUtils.logAndThrowStoreException(LOG,
-          FederationStateStoreErrorCode.POLICY_SINGLE_SELECT_FAIL, errMsg);
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
     }
 
     return GetSubClusterPolicyConfigurationResponse
@@ -298,8 +286,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
   public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
       SetSubClusterPolicyConfigurationRequest request) throws YarnException {
 
-    FederationPolicyStoreInputValidator
-        .validateSetSubClusterPolicyConfigurationRequest(request);
+    FederationPolicyStoreInputValidator.validate(request);
     policies.put(request.getPolicyConfiguration().getQueue(),
         request.getPolicyConfiguration());
     return SetSubClusterPolicyConfigurationResponse.newInstance();

+ 937 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java

@@ -0,0 +1,937 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.impl;
+
+import java.nio.ByteBuffer;
+import java.sql.CallableStatement;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.zaxxer.hikari.HikariDataSource;
+
+/**
+ * SQL implementation of {@link FederationStateStore}.
+ */
+public class SQLFederationStateStore implements FederationStateStore {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SQLFederationStateStore.class);
+
+  // Stored procedures patterns
+
+  private static final String CALL_SP_REGISTER_SUBCLUSTER =
+      "{call sp_registerSubCluster(?, ?, ?, ?, ?, ?, ?, ?, ?)}";
+
+  private static final String CALL_SP_DEREGISTER_SUBCLUSTER =
+      "{call sp_deregisterSubCluster(?, ?, ?)}";
+
+  private static final String CALL_SP_GET_SUBCLUSTER =
+      "{call sp_getSubCluster(?, ?, ?, ?, ?, ?, ?, ?, ?)}";
+
+  private static final String CALL_SP_GET_SUBCLUSTERS =
+      "{call sp_getSubClusters()}";
+
+  private static final String CALL_SP_SUBCLUSTER_HEARTBEAT =
+      "{call sp_subClusterHeartbeat(?, ?, ?, ?)}";
+
+  private static final String CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER =
+      "{call sp_addApplicationHomeSubCluster(?, ?, ?, ?)}";
+
+  private static final String CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER =
+      "{call sp_updateApplicationHomeSubCluster(?, ?, ?)}";
+
+  private static final String CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER =
+      "{call sp_deleteApplicationHomeSubCluster(?, ?)}";
+
+  private static final String CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER =
+      "{call sp_getApplicationHomeSubCluster(?, ?)}";
+
+  private static final String CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER =
+      "{call sp_getApplicationsHomeSubCluster()}";
+
+  private static final String CALL_SP_SET_POLICY_CONFIGURATION =
+      "{call sp_setPolicyConfiguration(?, ?, ?, ?)}";
+
+  private static final String CALL_SP_GET_POLICY_CONFIGURATION =
+      "{call sp_getPolicyConfiguration(?, ?, ?)}";
+
+  private static final String CALL_SP_GET_POLICIES_CONFIGURATIONS =
+      "{call sp_getPoliciesConfigurations()}";
+
+  private Calendar utcCalendar =
+      Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+
+  // SQL database configurations
+
+  private String userName;
+  private String password;
+  private String driverClass;
+  private String url;
+  private int maximumPoolSize;
+  private HikariDataSource dataSource = null;
+
+  @Override
+  public void init(Configuration conf) throws YarnException {
+    driverClass =
+        conf.get(YarnConfiguration.FEDERATION_STATESTORE_SQL_JDBC_CLASS,
+            YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_SQL_JDBC_CLASS);
+    maximumPoolSize =
+        conf.getInt(YarnConfiguration.FEDERATION_STATESTORE_SQL_MAXCONNECTIONS,
+            YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_SQL_MAXCONNECTIONS);
+
+    // An helper method avoids to assign a null value to these property
+    userName = conf.get(YarnConfiguration.FEDERATION_STATESTORE_SQL_USERNAME);
+    password = conf.get(YarnConfiguration.FEDERATION_STATESTORE_SQL_PASSWORD);
+    url = conf.get(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL);
+
+    try {
+      Class.forName(driverClass);
+    } catch (ClassNotFoundException e) {
+      FederationStateStoreUtils.logAndThrowException(LOG,
+          "Driver class not found.", e);
+    }
+
+    // Create the data source to pool connections in a thread-safe manner
+    dataSource = new HikariDataSource();
+    dataSource.setDataSourceClassName(driverClass);
+    FederationStateStoreUtils.setUsername(dataSource, userName);
+    FederationStateStoreUtils.setPassword(dataSource, password);
+    FederationStateStoreUtils.setProperty(dataSource,
+        FederationStateStoreUtils.FEDERATION_STORE_URL, url);
+    dataSource.setMaximumPoolSize(maximumPoolSize);
+    LOG.info("Initialized connection pool to the Federation StateStore "
+        + "database at address: " + url);
+  }
+
+  @Override
+  public SubClusterRegisterResponse registerSubCluster(
+      SubClusterRegisterRequest registerSubClusterRequest)
+      throws YarnException {
+
+    // Input validator
+    FederationMembershipStateStoreInputValidator
+        .validate(registerSubClusterRequest);
+
+    CallableStatement cstmt = null;
+    Connection conn = null;
+
+    SubClusterInfo subClusterInfo =
+        registerSubClusterRequest.getSubClusterInfo();
+    SubClusterId subClusterId = subClusterInfo.getSubClusterId();
+
+    try {
+      conn = getConnection();
+      cstmt = conn.prepareCall(CALL_SP_REGISTER_SUBCLUSTER);
+
+      // Set the parameters for the stored procedure
+      cstmt.setString(1, subClusterId.getId());
+      cstmt.setString(2, subClusterInfo.getAMRMServiceAddress());
+      cstmt.setString(3, subClusterInfo.getClientRMServiceAddress());
+      cstmt.setString(4, subClusterInfo.getRMAdminServiceAddress());
+      cstmt.setString(5, subClusterInfo.getRMWebServiceAddress());
+      cstmt.setString(6, subClusterInfo.getState().toString());
+      cstmt.setLong(7, subClusterInfo.getLastStartTime());
+      cstmt.setString(8, subClusterInfo.getCapability());
+      cstmt.registerOutParameter(9, java.sql.Types.INTEGER);
+
+      // Execute the query
+      cstmt.executeUpdate();
+
+      // Check the ROWCOUNT value, if it is equal to 0 it means the call
+      // did not add a new subcluster into FederationStateStore
+      if (cstmt.getInt(9) == 0) {
+        String errMsg = "SubCluster " + subClusterId
+            + " was not registered into the StateStore";
+        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+      }
+      // Check the ROWCOUNT value, if it is different from 1 it means the call
+      // had a wrong behavior. Maybe the database is not set correctly.
+      if (cstmt.getInt(9) != 1) {
+        String errMsg = "Wrong behavior during registration of SubCluster "
+            + subClusterId + " into the StateStore";
+        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+      }
+
+      LOG.info(
+          "Registered the SubCluster " + subClusterId + " into the StateStore");
+
+    } catch (SQLException e) {
+      FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+          "Unable to register the SubCluster " + subClusterId
+              + " into the StateStore",
+          e);
+    } finally {
+      // Return to the pool the CallableStatement and the Connection
+      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+    }
+    return SubClusterRegisterResponse.newInstance();
+  }
+
+  @Override
+  public SubClusterDeregisterResponse deregisterSubCluster(
+      SubClusterDeregisterRequest subClusterDeregisterRequest)
+      throws YarnException {
+
+    // Input validator
+    FederationMembershipStateStoreInputValidator
+        .validate(subClusterDeregisterRequest);
+
+    CallableStatement cstmt = null;
+    Connection conn = null;
+
+    SubClusterId subClusterId = subClusterDeregisterRequest.getSubClusterId();
+    SubClusterState state = subClusterDeregisterRequest.getState();
+
+    try {
+      conn = getConnection();
+      cstmt = conn.prepareCall(CALL_SP_DEREGISTER_SUBCLUSTER);
+
+      // Set the parameters for the stored procedure
+      cstmt.setString(1, subClusterId.getId());
+      cstmt.setString(2, state.toString());
+      cstmt.registerOutParameter(3, java.sql.Types.INTEGER);
+
+      // Execute the query
+      cstmt.executeUpdate();
+
+      // Check the ROWCOUNT value, if it is equal to 0 it means the call
+      // did not deregister the subcluster into FederationStateStore
+      if (cstmt.getInt(3) == 0) {
+        String errMsg = "SubCluster " + subClusterId + " not found";
+        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+      }
+      // Check the ROWCOUNT value, if it is different from 1 it means the call
+      // had a wrong behavior. Maybe the database is not set correctly.
+      if (cstmt.getInt(3) != 1) {
+        String errMsg = "Wrong behavior during deregistration of SubCluster "
+            + subClusterId + " from the StateStore";
+        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+      }
+
+      LOG.info("Deregistered the SubCluster " + subClusterId + " state to "
+          + state.toString());
+
+    } catch (SQLException e) {
+      FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+          "Unable to deregister the sub-cluster " + subClusterId + " state to "
+              + state.toString(),
+          e);
+    } finally {
+      // Return to the pool the CallableStatement and the Connection
+      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+    }
+    return SubClusterDeregisterResponse.newInstance();
+  }
+
+  @Override
+  public SubClusterHeartbeatResponse subClusterHeartbeat(
+      SubClusterHeartbeatRequest subClusterHeartbeatRequest)
+      throws YarnException {
+
+    // Input validator
+    FederationMembershipStateStoreInputValidator
+        .validate(subClusterHeartbeatRequest);
+
+    CallableStatement cstmt = null;
+    Connection conn = null;
+
+    SubClusterId subClusterId = subClusterHeartbeatRequest.getSubClusterId();
+    SubClusterState state = subClusterHeartbeatRequest.getState();
+
+    try {
+      conn = getConnection();
+      cstmt = conn.prepareCall(CALL_SP_SUBCLUSTER_HEARTBEAT);
+
+      // Set the parameters for the stored procedure
+      cstmt.setString(1, subClusterId.getId());
+      cstmt.setString(2, state.toString());
+      cstmt.setString(3, subClusterHeartbeatRequest.getCapability());
+      cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
+
+      // Execute the query
+      cstmt.executeUpdate();
+
+      // Check the ROWCOUNT value, if it is equal to 0 it means the call
+      // did not update the subcluster into FederationStateStore
+      if (cstmt.getInt(4) == 0) {
+        String errMsg = "SubCluster " + subClusterId.toString()
+            + " does not exist; cannot heartbeat";
+        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+      }
+      // Check the ROWCOUNT value, if it is different from 1 it means the call
+      // had a wrong behavior. Maybe the database is not set correctly.
+      if (cstmt.getInt(4) != 1) {
+        String errMsg =
+            "Wrong behavior during the heartbeat of SubCluster " + subClusterId;
+        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+      }
+
+      LOG.info("Heartbeated the StateStore for the specified SubCluster "
+          + subClusterId);
+
+    } catch (SQLException e) {
+      FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+          "Unable to heartbeat the StateStore for the specified SubCluster "
+              + subClusterId,
+          e);
+    } finally {
+      // Return to the pool the CallableStatement and the Connection
+      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+    }
+    return SubClusterHeartbeatResponse.newInstance();
+  }
+
+  @Override
+  public GetSubClusterInfoResponse getSubCluster(
+      GetSubClusterInfoRequest subClusterRequest) throws YarnException {
+
+    // Input validator
+    FederationMembershipStateStoreInputValidator.validate(subClusterRequest);
+
+    CallableStatement cstmt = null;
+    Connection conn = null;
+
+    SubClusterInfo subClusterInfo = null;
+    SubClusterId subClusterId = subClusterRequest.getSubClusterId();
+
+    try {
+      conn = getConnection();
+      cstmt = conn.prepareCall(CALL_SP_GET_SUBCLUSTER);
+      cstmt.setString(1, subClusterId.getId());
+
+      // Set the parameters for the stored procedure
+      cstmt.registerOutParameter(2, java.sql.Types.VARCHAR);
+      cstmt.registerOutParameter(3, java.sql.Types.VARCHAR);
+      cstmt.registerOutParameter(4, java.sql.Types.VARCHAR);
+      cstmt.registerOutParameter(5, java.sql.Types.VARCHAR);
+      cstmt.registerOutParameter(6, java.sql.Types.TIMESTAMP);
+      cstmt.registerOutParameter(7, java.sql.Types.VARCHAR);
+      cstmt.registerOutParameter(8, java.sql.Types.BIGINT);
+      cstmt.registerOutParameter(9, java.sql.Types.VARCHAR);
+
+      // Execute the query
+      cstmt.execute();
+
+      String amRMAddress = cstmt.getString(2);
+      String clientRMAddress = cstmt.getString(3);
+      String rmAdminAddress = cstmt.getString(4);
+      String webAppAddress = cstmt.getString(5);
+
+      Timestamp heartBeatTimeStamp = cstmt.getTimestamp(6, utcCalendar);
+      long lastHeartBeat =
+          heartBeatTimeStamp != null ? heartBeatTimeStamp.getTime() : 0;
+
+      SubClusterState state = SubClusterState.fromString(cstmt.getString(7));
+      long lastStartTime = cstmt.getLong(8);
+      String capability = cstmt.getString(9);
+
+      subClusterInfo = SubClusterInfo.newInstance(subClusterId, amRMAddress,
+          clientRMAddress, rmAdminAddress, webAppAddress, lastHeartBeat, state,
+          lastStartTime, capability);
+
+      // Check if the output it is a valid subcluster
+      try {
+        FederationMembershipStateStoreInputValidator
+            .checkSubClusterInfo(subClusterInfo);
+      } catch (FederationStateStoreInvalidInputException e) {
+        String errMsg =
+            "SubCluster " + subClusterId.toString() + " does not exist";
+        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Got the information about the specified SubCluster "
+            + subClusterInfo.toString());
+      }
+    } catch (SQLException e) {
+      FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+          "Unable to obtain the SubCluster information for " + subClusterId, e);
+    } finally {
+      // Return to the pool the CallableStatement and the Connection
+      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+    }
+    return GetSubClusterInfoResponse.newInstance(subClusterInfo);
+  }
+
+  @Override
+  public GetSubClustersInfoResponse getSubClusters(
+      GetSubClustersInfoRequest subClustersRequest) throws YarnException {
+    CallableStatement cstmt = null;
+    Connection conn = null;
+    ResultSet rs = null;
+    List<SubClusterInfo> subClusters = new ArrayList<SubClusterInfo>();
+
+    try {
+      conn = getConnection();
+      cstmt = conn.prepareCall(CALL_SP_GET_SUBCLUSTERS);
+
+      // Execute the query
+      rs = cstmt.executeQuery();
+
+      while (rs.next()) {
+
+        // Extract the output for each tuple
+        String subClusterName = rs.getString(1);
+        String amRMAddress = rs.getString(2);
+        String clientRMAddress = rs.getString(3);
+        String rmAdminAddress = rs.getString(4);
+        String webAppAddress = rs.getString(5);
+        long lastHeartBeat = rs.getTimestamp(6, utcCalendar).getTime();
+        SubClusterState state = SubClusterState.fromString(rs.getString(7));
+        long lastStartTime = rs.getLong(8);
+        String capability = rs.getString(9);
+
+        SubClusterId subClusterId = SubClusterId.newInstance(subClusterName);
+        SubClusterInfo subClusterInfo = SubClusterInfo.newInstance(subClusterId,
+            amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress,
+            lastHeartBeat, state, lastStartTime, capability);
+
+        // Check if the output it is a valid subcluster
+        try {
+          FederationMembershipStateStoreInputValidator
+              .checkSubClusterInfo(subClusterInfo);
+        } catch (FederationStateStoreInvalidInputException e) {
+          String errMsg =
+              "SubCluster " + subClusterId.toString() + " is not valid";
+          FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+        }
+
+        // Filter the inactive
+        if (!subClustersRequest.getFilterInactiveSubClusters()
+            || subClusterInfo.getState().isActive()) {
+          subClusters.add(subClusterInfo);
+        }
+      }
+
+    } catch (SQLException e) {
+      FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+          "Unable to obtain the information for all the SubClusters ", e);
+    } finally {
+      // Return to the pool the CallableStatement and the Connection
+      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn, rs);
+    }
+    return GetSubClustersInfoResponse.newInstance(subClusters);
+  }
+
+  @Override
+  public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
+      AddApplicationHomeSubClusterRequest request) throws YarnException {
+
+    // Input validator
+    FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+
+    CallableStatement cstmt = null;
+    Connection conn = null;
+
+    String subClusterHome = null;
+    ApplicationId appId =
+        request.getApplicationHomeSubCluster().getApplicationId();
+    SubClusterId subClusterId =
+        request.getApplicationHomeSubCluster().getHomeSubCluster();
+
+    try {
+      conn = getConnection();
+      cstmt = conn.prepareCall(CALL_SP_ADD_APPLICATION_HOME_SUBCLUSTER);
+
+      // Set the parameters for the stored procedure
+      cstmt.setString(1, appId.toString());
+      cstmt.setString(2, subClusterId.getId());
+      cstmt.registerOutParameter(3, java.sql.Types.VARCHAR);
+      cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
+
+      // Execute the query
+      cstmt.executeUpdate();
+
+      subClusterHome = cstmt.getString(3);
+      SubClusterId subClusterIdHome = SubClusterId.newInstance(subClusterHome);
+
+      // For failover reason, we check the returned SubClusterId.
+      // If it is equal to the subclusterId we sent, the call added the new
+      // application into FederationStateStore. If the call returns a different
+      // SubClusterId it means we already tried to insert this application but a
+      // component (Router/StateStore/RM) failed during the submission.
+      if (subClusterId.equals(subClusterIdHome)) {
+        // Check the ROWCOUNT value, if it is equal to 0 it means the call
+        // did not add a new application into FederationStateStore
+        if (cstmt.getInt(4) == 0) {
+          String errMsg = "The application " + appId
+              + " was not insert into the StateStore";
+          FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+        }
+        // Check the ROWCOUNT value, if it is different from 1 it means the call
+        // had a wrong behavior. Maybe the database is not set correctly.
+        if (cstmt.getInt(4) != 1) {
+          String errMsg = "Wrong behavior during the insertion of SubCluster "
+              + subClusterId;
+          FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+        }
+
+        LOG.info("Insert into the StateStore the application: " + appId
+            + " in SubCluster:  " + subClusterHome);
+      } else {
+        // Check the ROWCOUNT value, if it is different from 0 it means the call
+        // did edited the table
+        if (cstmt.getInt(4) != 0) {
+          String errMsg =
+              "The application " + appId + " does exist but was overwritten";
+          FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+        }
+        LOG.info("Application: " + appId + " already present with SubCluster:  "
+            + subClusterHome);
+      }
+
+    } catch (SQLException e) {
+      FederationStateStoreUtils
+          .logAndThrowRetriableException(LOG,
+              "Unable to insert the newly generated application "
+                  + request.getApplicationHomeSubCluster().getApplicationId(),
+              e);
+    } finally {
+      // Return to the pool the CallableStatement and the Connection
+      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+    }
+    return AddApplicationHomeSubClusterResponse
+        .newInstance(SubClusterId.newInstance(subClusterHome));
+  }
+
+  @Override
+  public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
+      UpdateApplicationHomeSubClusterRequest request) throws YarnException {
+
+    // Input validator
+    FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+
+    CallableStatement cstmt = null;
+    Connection conn = null;
+
+    ApplicationId appId =
+        request.getApplicationHomeSubCluster().getApplicationId();
+    SubClusterId subClusterId =
+        request.getApplicationHomeSubCluster().getHomeSubCluster();
+
+    try {
+      conn = getConnection();
+      cstmt = conn.prepareCall(CALL_SP_UPDATE_APPLICATION_HOME_SUBCLUSTER);
+
+      // Set the parameters for the stored procedure
+      cstmt.setString(1, appId.toString());
+      cstmt.setString(2, subClusterId.getId());
+      cstmt.registerOutParameter(3, java.sql.Types.INTEGER);
+
+      // Execute the query
+      cstmt.executeUpdate();
+
+      // Check the ROWCOUNT value, if it is equal to 0 it means the call
+      // did not update the application into FederationStateStore
+      if (cstmt.getInt(3) == 0) {
+        String errMsg = "Application " + appId + " does not exist";
+        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+      }
+      // Check the ROWCOUNT value, if it is different from 1 it means the call
+      // had a wrong behavior. Maybe the database is not set correctly.
+      if (cstmt.getInt(3) != 1) {
+        String errMsg =
+            "Wrong behavior during the update of SubCluster " + subClusterId;
+        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+      }
+
+      LOG.info(
+          "Update the SubCluster to {} for application {} in the StateStore",
+          subClusterId, appId);
+
+    } catch (SQLException e) {
+      FederationStateStoreUtils
+          .logAndThrowRetriableException(LOG,
+              "Unable to update the application "
+                  + request.getApplicationHomeSubCluster().getApplicationId(),
+              e);
+    } finally {
+      // Return to the pool the CallableStatement and the Connection
+      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+    }
+    return UpdateApplicationHomeSubClusterResponse.newInstance();
+  }
+
+  @Override
+  public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
+      GetApplicationHomeSubClusterRequest request) throws YarnException {
+    // Input validator
+    FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+
+    CallableStatement cstmt = null;
+    Connection conn = null;
+
+    SubClusterId homeRM = null;
+
+    try {
+      conn = getConnection();
+      cstmt = conn.prepareCall(CALL_SP_GET_APPLICATION_HOME_SUBCLUSTER);
+
+      // Set the parameters for the stored procedure
+      cstmt.setString(1, request.getApplicationId().toString());
+      cstmt.registerOutParameter(2, java.sql.Types.VARCHAR);
+
+      // Execute the query
+      cstmt.execute();
+
+      if (cstmt.getString(2) != null) {
+        homeRM = SubClusterId.newInstance(cstmt.getString(2));
+      } else {
+        String errMsg =
+            "Application " + request.getApplicationId() + " does not exist";
+        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Got the information about the specified application  "
+            + request.getApplicationId() + ". The AM is running in " + homeRM);
+      }
+    } catch (SQLException e) {
+      FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+          "Unable to obtain the application information "
+              + "for the specified application " + request.getApplicationId(),
+          e);
+    } finally {
+
+      // Return to the pool the CallableStatement and the Connection
+      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+    }
+    return GetApplicationHomeSubClusterResponse
+        .newInstance(ApplicationHomeSubCluster
+            .newInstance(request.getApplicationId(), homeRM));
+  }
+
+  @Override
+  public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
+      GetApplicationsHomeSubClusterRequest request) throws YarnException {
+    CallableStatement cstmt = null;
+    Connection conn = null;
+    ResultSet rs = null;
+    List<ApplicationHomeSubCluster> appsHomeSubClusters =
+        new ArrayList<ApplicationHomeSubCluster>();
+
+    try {
+      conn = getConnection();
+      cstmt = conn.prepareCall(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER);
+
+      // Execute the query
+      rs = cstmt.executeQuery();
+
+      while (rs.next()) {
+
+        // Extract the output for each tuple
+        String applicationId = rs.getString(1);
+        String homeSubCluster = rs.getString(2);
+
+        appsHomeSubClusters.add(ApplicationHomeSubCluster.newInstance(
+            ApplicationId.fromString(applicationId),
+            SubClusterId.newInstance(homeSubCluster)));
+      }
+
+    } catch (SQLException e) {
+      FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+          "Unable to obtain the information for all the applications ", e);
+    } finally {
+      // Return to the pool the CallableStatement and the Connection
+      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn, rs);
+    }
+    return GetApplicationsHomeSubClusterResponse
+        .newInstance(appsHomeSubClusters);
+  }
+
+  @Override
+  public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster(
+      DeleteApplicationHomeSubClusterRequest request) throws YarnException {
+
+    // Input validator
+    FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+
+    CallableStatement cstmt = null;
+    Connection conn = null;
+
+    try {
+      conn = getConnection();
+      cstmt = conn.prepareCall(CALL_SP_DELETE_APPLICATION_HOME_SUBCLUSTER);
+
+      // Set the parameters for the stored procedure
+      cstmt.setString(1, request.getApplicationId().toString());
+      cstmt.registerOutParameter(2, java.sql.Types.INTEGER);
+
+      // Execute the query
+      cstmt.executeUpdate();
+
+      // Check the ROWCOUNT value, if it is equal to 0 it means the call
+      // did not delete the application from FederationStateStore
+      if (cstmt.getInt(2) == 0) {
+        String errMsg =
+            "Application " + request.getApplicationId() + " does not exist";
+        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+      }
+      // Check the ROWCOUNT value, if it is different from 1 it means the call
+      // had a wrong behavior. Maybe the database is not set correctly.
+      if (cstmt.getInt(2) != 1) {
+        String errMsg = "Wrong behavior during deleting the application "
+            + request.getApplicationId();
+        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+      }
+
+      LOG.info("Delete from the StateStore the application: {}",
+          request.getApplicationId());
+
+    } catch (SQLException e) {
+      FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+          "Unable to delete the application " + request.getApplicationId(), e);
+    } finally {
+      // Return to the pool the CallableStatement and the Connection
+      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+    }
+    return DeleteApplicationHomeSubClusterResponse.newInstance();
+  }
+
+  @Override
+  public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
+      GetSubClusterPolicyConfigurationRequest request) throws YarnException {
+
+    // Input validator
+    FederationPolicyStoreInputValidator.validate(request);
+
+    CallableStatement cstmt = null;
+    Connection conn = null;
+    SubClusterPolicyConfiguration subClusterPolicyConfiguration = null;
+
+    try {
+      conn = getConnection();
+      cstmt = conn.prepareCall(CALL_SP_GET_POLICY_CONFIGURATION);
+
+      // Set the parameters for the stored procedure
+      cstmt.setString(1, request.getQueue());
+      cstmt.registerOutParameter(2, java.sql.Types.VARCHAR);
+      cstmt.registerOutParameter(3, java.sql.Types.VARBINARY);
+
+      // Execute the query
+      cstmt.executeUpdate();
+
+      // Check if the output it is a valid policy
+      if (cstmt.getString(2) != null && cstmt.getBytes(3) != null) {
+        subClusterPolicyConfiguration =
+            SubClusterPolicyConfiguration.newInstance(request.getQueue(),
+                cstmt.getString(2), ByteBuffer.wrap(cstmt.getBytes(3)));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Selected from StateStore the policy for the queue: "
+              + subClusterPolicyConfiguration.toString());
+        }
+      } else {
+        String errMsg =
+            "Policy for queue " + request.getQueue() + " does not exist";
+        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+      }
+
+    } catch (SQLException e) {
+      FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+          "Unable to select the policy for the queue :" + request.getQueue(),
+          e);
+    } finally {
+      // Return to the pool the CallableStatement and the Connection
+      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+    }
+    return GetSubClusterPolicyConfigurationResponse
+        .newInstance(subClusterPolicyConfiguration);
+  }
+
+  @Override
+  public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
+      SetSubClusterPolicyConfigurationRequest request) throws YarnException {
+
+    // Input validator
+    FederationPolicyStoreInputValidator.validate(request);
+
+    CallableStatement cstmt = null;
+    Connection conn = null;
+
+    SubClusterPolicyConfiguration policyConf = request.getPolicyConfiguration();
+
+    try {
+      conn = getConnection();
+      cstmt = conn.prepareCall(CALL_SP_SET_POLICY_CONFIGURATION);
+
+      // Set the parameters for the stored procedure
+      cstmt.setString(1, policyConf.getQueue());
+      cstmt.setString(2, policyConf.getType());
+      cstmt.setBytes(3, getByteArray(policyConf.getParams()));
+      cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
+
+      // Execute the query
+      cstmt.executeUpdate();
+
+      // Check the ROWCOUNT value, if it is equal to 0 it means the call
+      // did not add a new policy into FederationStateStore
+      if (cstmt.getInt(4) == 0) {
+        String errMsg = "The policy " + policyConf.getQueue()
+            + " was not insert into the StateStore";
+        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+      }
+      // Check the ROWCOUNT value, if it is different from 1 it means the call
+      // had a wrong behavior. Maybe the database is not set correctly.
+      if (cstmt.getInt(4) != 1) {
+        String errMsg =
+            "Wrong behavior during insert the policy " + policyConf.getQueue();
+        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+      }
+
+      LOG.info("Insert into the state store the policy for the queue: "
+          + policyConf.getQueue());
+
+    } catch (SQLException e) {
+      FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+          "Unable to insert the newly generated policy for the queue :"
+              + policyConf.getQueue(),
+          e);
+    } finally {
+      // Return to the pool the CallableStatement and the Connection
+      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn);
+    }
+    return SetSubClusterPolicyConfigurationResponse.newInstance();
+  }
+
+  @Override
+  public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
+      GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
+
+    CallableStatement cstmt = null;
+    Connection conn = null;
+    ResultSet rs = null;
+    List<SubClusterPolicyConfiguration> policyConfigurations =
+        new ArrayList<SubClusterPolicyConfiguration>();
+
+    try {
+      conn = getConnection();
+      cstmt = conn.prepareCall(CALL_SP_GET_POLICIES_CONFIGURATIONS);
+
+      // Execute the query
+      rs = cstmt.executeQuery();
+
+      while (rs.next()) {
+
+        // Extract the output for each tuple
+        String queue = rs.getString(1);
+        String type = rs.getString(2);
+        byte[] policyInfo = rs.getBytes(3);
+
+        SubClusterPolicyConfiguration subClusterPolicyConfiguration =
+            SubClusterPolicyConfiguration.newInstance(queue, type,
+                ByteBuffer.wrap(policyInfo));
+        policyConfigurations.add(subClusterPolicyConfiguration);
+      }
+    } catch (SQLException e) {
+      FederationStateStoreUtils.logAndThrowRetriableException(LOG,
+          "Unable to obtain the policy information for all the queues.", e);
+    } finally {
+      // Return to the pool the CallableStatement and the Connection
+      FederationStateStoreUtils.returnToPool(LOG, cstmt, conn, rs);
+    }
+
+    return GetSubClusterPoliciesConfigurationsResponse
+        .newInstance(policyConfigurations);
+  }
+
+  @Override
+  public Version getCurrentVersion() {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Version loadVersion() {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (dataSource != null) {
+      dataSource.close();
+    }
+  }
+
+  /**
+   * Get a connection from the DataSource pool.
+   *
+   * @return a connection from the DataSource pool.
+   * @throws SQLException on failure
+   */
+  public Connection getConnection() throws SQLException {
+    return dataSource.getConnection();
+  }
+
+  private static byte[] getByteArray(ByteBuffer bb) {
+    byte[] ba = new byte[bb.limit()];
+    bb.get(ba);
+    return ba;
+  }
+
+}

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterState.java

@@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.federation.store.records;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * <p>
@@ -61,4 +63,23 @@ public enum SubClusterState {
     return (this == SC_UNREGISTERED || this == SC_DECOMMISSIONED
         || this == SC_LOST);
   }
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SubClusterState.class);
+
+  /**
+   * Convert a string into {@code SubClusterState}.
+   *
+   * @param x the string to convert in SubClusterState
+   * @return the respective {@code SubClusterState}
+   */
+  public static SubClusterState fromString(String x) {
+    try {
+      return SubClusterState.valueOf(x);
+    } catch (Exception e) {
+      LOG.error("Invalid SubCluster State value in the StateStore does not"
+          + " match with the YARN Federation standard.");
+      return null;
+    }
+  }
 }

+ 4 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java

@@ -51,8 +51,7 @@ public final class FederationApplicationHomeSubClusterStoreInputValidator {
    *          against
    * @throws FederationStateStoreInvalidInputException if the request is invalid
    */
-  public static void validateAddApplicationHomeSubClusterRequest(
-      AddApplicationHomeSubClusterRequest request)
+  public static void validate(AddApplicationHomeSubClusterRequest request)
       throws FederationStateStoreInvalidInputException {
     if (request == null) {
       String message = "Missing AddApplicationHomeSubCluster Request."
@@ -75,8 +74,7 @@ public final class FederationApplicationHomeSubClusterStoreInputValidator {
    *          validate against
    * @throws FederationStateStoreInvalidInputException if the request is invalid
    */
-  public static void validateUpdateApplicationHomeSubClusterRequest(
-      UpdateApplicationHomeSubClusterRequest request)
+  public static void validate(UpdateApplicationHomeSubClusterRequest request)
       throws FederationStateStoreInvalidInputException {
     if (request == null) {
       String message = "Missing UpdateApplicationHomeSubCluster Request."
@@ -99,8 +97,7 @@ public final class FederationApplicationHomeSubClusterStoreInputValidator {
    *          against
    * @throws FederationStateStoreInvalidInputException if the request is invalid
    */
-  public static void validateGetApplicationHomeSubClusterRequest(
-      GetApplicationHomeSubClusterRequest request)
+  public static void validate(GetApplicationHomeSubClusterRequest request)
       throws FederationStateStoreInvalidInputException {
     if (request == null) {
       String message = "Missing GetApplicationHomeSubCluster Request."
@@ -122,8 +119,7 @@ public final class FederationApplicationHomeSubClusterStoreInputValidator {
    *          validate against
    * @throws FederationStateStoreInvalidInputException if the request is invalid
    */
-  public static void validateDeleteApplicationHomeSubClusterRequest(
-      DeleteApplicationHomeSubClusterRequest request)
+  public static void validate(DeleteApplicationHomeSubClusterRequest request)
       throws FederationStateStoreInvalidInputException {
     if (request == null) {
       String message = "Missing DeleteApplicationHomeSubCluster Request."

+ 5 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java

@@ -53,8 +53,7 @@ public final class FederationMembershipStateStoreInputValidator {
    * @param request the {@link SubClusterRegisterRequest} to validate against
    * @throws FederationStateStoreInvalidInputException if the request is invalid
    */
-  public static void validateSubClusterRegisterRequest(
-      SubClusterRegisterRequest request)
+  public static void validate(SubClusterRegisterRequest request)
       throws FederationStateStoreInvalidInputException {
 
     // check if the request is present
@@ -79,8 +78,7 @@ public final class FederationMembershipStateStoreInputValidator {
    * @param request the {@link SubClusterDeregisterRequest} to validate against
    * @throws FederationStateStoreInvalidInputException if the request is invalid
    */
-  public static void validateSubClusterDeregisterRequest(
-      SubClusterDeregisterRequest request)
+  public static void validate(SubClusterDeregisterRequest request)
       throws FederationStateStoreInvalidInputException {
 
     // check if the request is present
@@ -111,8 +109,7 @@ public final class FederationMembershipStateStoreInputValidator {
    * @param request the {@link SubClusterHeartbeatRequest} to validate against
    * @throws FederationStateStoreInvalidInputException if the request is invalid
    */
-  public static void validateSubClusterHeartbeatRequest(
-      SubClusterHeartbeatRequest request)
+  public static void validate(SubClusterHeartbeatRequest request)
       throws FederationStateStoreInvalidInputException {
 
     // check if the request is present
@@ -143,8 +140,7 @@ public final class FederationMembershipStateStoreInputValidator {
    * @param request the {@link GetSubClusterInfoRequest} to validate against
    * @throws FederationStateStoreInvalidInputException if the request is invalid
    */
-  public static void validateGetSubClusterInfoRequest(
-      GetSubClusterInfoRequest request)
+  public static void validate(GetSubClusterInfoRequest request)
       throws FederationStateStoreInvalidInputException {
 
     // check if the request is present
@@ -169,7 +165,7 @@ public final class FederationMembershipStateStoreInputValidator {
    * @throws FederationStateStoreInvalidInputException if the SubCluster Info
    *           are invalid
    */
-  private static void checkSubClusterInfo(SubClusterInfo subClusterInfo)
+  public static void checkSubClusterInfo(SubClusterInfo subClusterInfo)
       throws FederationStateStoreInvalidInputException {
     if (subClusterInfo == null) {
       String message = "Missing SubCluster Information."

+ 2 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java

@@ -48,8 +48,7 @@ public final class FederationPolicyStoreInputValidator {
    *          validate against
    * @throws FederationStateStoreInvalidInputException if the request is invalid
    */
-  public static void validateGetSubClusterPolicyConfigurationRequest(
-      GetSubClusterPolicyConfigurationRequest request)
+  public static void validate(GetSubClusterPolicyConfigurationRequest request)
       throws FederationStateStoreInvalidInputException {
     if (request == null) {
       String message = "Missing GetSubClusterPolicyConfiguration Request."
@@ -72,8 +71,7 @@ public final class FederationPolicyStoreInputValidator {
    *          validate against
    * @throws FederationStateStoreInvalidInputException if the request is invalid
    */
-  public static void validateSetSubClusterPolicyConfigurationRequest(
-      SetSubClusterPolicyConfigurationRequest request)
+  public static void validate(SetSubClusterPolicyConfigurationRequest request)
       throws FederationStateStoreInvalidInputException {
     if (request == null) {
       String message = "Missing SetSubClusterPolicyConfiguration Request."

+ 84 - 25
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreUtils.java

@@ -20,16 +20,18 @@ package org.apache.hadoop.yarn.server.federation.store.utils;
 
 import java.sql.CallableStatement;
 import java.sql.Connection;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode;
 import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
 import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
 import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.zaxxer.hikari.HikariDataSource;
+
 /**
  * Common utility methods used by the store implementations.
  *
@@ -39,19 +41,22 @@ public final class FederationStateStoreUtils {
   public static final Logger LOG =
       LoggerFactory.getLogger(FederationStateStoreUtils.class);
 
+  public final static String FEDERATION_STORE_URL = "url";
+
   private FederationStateStoreUtils() {
   }
 
   /**
-   * Returns the SQL <code>FederationStateStore</code> connection to the pool.
+   * Returns the SQL <code>FederationStateStore</code> connections to the pool.
    *
    * @param log the logger interface
    * @param cstmt the interface used to execute SQL stored procedures
    * @param conn the SQL connection
+   * @param rs the ResultSet interface used to execute SQL stored procedures
    * @throws YarnException on failure
    */
   public static void returnToPool(Logger log, CallableStatement cstmt,
-      Connection conn) throws YarnException {
+      Connection conn, ResultSet rs) throws YarnException {
     if (cstmt != null) {
       try {
         cstmt.close();
@@ -69,6 +74,28 @@ public final class FederationStateStoreUtils {
             e);
       }
     }
+
+    if (rs != null) {
+      try {
+        rs.close();
+      } catch (SQLException e) {
+        logAndThrowException(log, "Exception while trying to close ResultSet",
+            e);
+      }
+    }
+  }
+
+  /**
+   * Returns the SQL <code>FederationStateStore</code> connections to the pool.
+   *
+   * @param log the logger interface
+   * @param cstmt the interface used to execute SQL stored procedures
+   * @param conn the SQL connection
+   * @throws YarnException on failure
+   */
+  public static void returnToPool(Logger log, CallableStatement cstmt,
+      Connection conn) throws YarnException {
+    returnToPool(log, cstmt, conn, null);
   }
 
   /**
@@ -95,28 +122,13 @@ public final class FederationStateStoreUtils {
    * <code>FederationStateStore</code>.
    *
    * @param log the logger interface
-   * @param code FederationStateStoreErrorCode of the error
    * @param errMsg the error message
    * @throws YarnException on failure
    */
-  public static void logAndThrowStoreException(Logger log,
-      FederationStateStoreErrorCode code, String errMsg) throws YarnException {
-    log.error(errMsg + " " + code.toString());
-    throw new FederationStateStoreException(code);
-  }
-
-  /**
-   * Throws an <code>FederationStateStoreException</code> due to an error in
-   * <code>FederationStateStore</code>.
-   *
-   * @param log the logger interface
-   * @param code FederationStateStoreErrorCode of the error
-   * @throws YarnException on failure
-   */
-  public static void logAndThrowStoreException(Logger log,
-      FederationStateStoreErrorCode code) throws YarnException {
-    log.error(code.toString());
-    throw new FederationStateStoreException(code);
+  public static void logAndThrowStoreException(Logger log, String errMsg)
+      throws YarnException {
+    log.error(errMsg);
+    throw new FederationStateStoreException(errMsg);
   }
 
   /**
@@ -129,7 +141,7 @@ public final class FederationStateStoreUtils {
    */
   public static void logAndThrowInvalidInputException(Logger log, String errMsg)
       throws YarnException {
-    LOG.error(errMsg);
+    log.error(errMsg);
     throw new FederationStateStoreInvalidInputException(errMsg);
   }
 
@@ -145,11 +157,58 @@ public final class FederationStateStoreUtils {
   public static void logAndThrowRetriableException(Logger log, String errMsg,
       Throwable t) throws YarnException {
     if (t != null) {
-      LOG.error(errMsg, t);
+      log.error(errMsg, t);
       throw new FederationStateStoreRetriableException(errMsg, t);
     } else {
-      LOG.error(errMsg);
+      log.error(errMsg);
       throw new FederationStateStoreRetriableException(errMsg);
     }
   }
+
+  /**
+   * Sets a specific value for a specific property of
+   * <code>HikariDataSource</code> SQL connections.
+   *
+   * @param dataSource the <code>HikariDataSource</code> connections
+   * @param property the property to set
+   * @param value the value to set
+   */
+  public static void setProperty(HikariDataSource dataSource, String property,
+      String value) {
+    LOG.debug("Setting property {} with value {}", property, value);
+    if (property != null && !property.isEmpty() && value != null) {
+      dataSource.addDataSourceProperty(property, value);
+    }
+  }
+
+  /**
+   * Sets a specific username for <code>HikariDataSource</code> SQL connections.
+   *
+   * @param dataSource the <code>HikariDataSource</code> connections
+   * @param userNameDB the value to set
+   */
+  public static void setUsername(HikariDataSource dataSource,
+      String userNameDB) {
+    if (userNameDB != null) {
+      dataSource.setUsername(userNameDB);
+      LOG.debug("Setting non NULL Username for Store connection");
+    } else {
+      LOG.debug("NULL Username specified for Store connection, so ignoring");
+    }
+  }
+
+  /**
+   * Sets a specific password for <code>HikariDataSource</code> SQL connections.
+   *
+   * @param dataSource the <code>HikariDataSource</code> connections
+   * @param password the value to set
+   */
+  public static void setPassword(HikariDataSource dataSource, String password) {
+    if (password != null) {
+      dataSource.setPassword(password);
+      LOG.debug("Setting non NULL Credentials for Store connection");
+    } else {
+      LOG.debug("NULL Credentials specified for Store connection, so ignoring");
+    }
+  }
 }

+ 49 - 25
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java

@@ -19,13 +19,14 @@ package org.apache.hadoop.yarn.server.federation.store.impl;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Calendar;
 import java.util.List;
+import java.util.TimeZone;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
-import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode;
 import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
 import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
@@ -87,13 +88,26 @@ public abstract class FederationStateStoreBaseTest {
   @Test
   public void testRegisterSubCluster() throws Exception {
     SubClusterId subClusterId = SubClusterId.newInstance("SC");
+
     SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
 
+    long previousTimeStamp =
+        Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
+
     SubClusterRegisterResponse result = stateStore.registerSubCluster(
         SubClusterRegisterRequest.newInstance(subClusterInfo));
 
+    long currentTimeStamp =
+        Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
+
     Assert.assertNotNull(result);
     Assert.assertEquals(subClusterInfo, querySubClusterInfo(subClusterId));
+
+    // The saved heartbeat is between the old one and the current timestamp
+    Assert.assertTrue(querySubClusterInfo(subClusterId)
+        .getLastHeartBeat() <= currentTimeStamp);
+    Assert.assertTrue(querySubClusterInfo(subClusterId)
+        .getLastHeartBeat() >= previousTimeStamp);
   }
 
   @Test
@@ -120,9 +134,7 @@ public abstract class FederationStateStoreBaseTest {
       stateStore.deregisterSubCluster(deregisterRequest);
       Assert.fail();
     } catch (FederationStateStoreException e) {
-      Assert.assertEquals(
-          FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_DEREGISTER_FAIL,
-          e.getCode());
+      Assert.assertTrue(e.getMessage().startsWith("SubCluster SC not found"));
     }
   }
 
@@ -149,9 +161,8 @@ public abstract class FederationStateStoreBaseTest {
       stateStore.getSubCluster(request).getSubClusterInfo();
       Assert.fail();
     } catch (FederationStateStoreException e) {
-      Assert.assertEquals(
-          FederationStateStoreErrorCode.MEMBERSHIP_SINGLE_SELECT_FAIL,
-          e.getCode());
+      Assert.assertTrue(
+          e.getMessage().startsWith("SubCluster SC does not exist"));
     }
   }
 
@@ -200,13 +211,24 @@ public abstract class FederationStateStoreBaseTest {
     SubClusterId subClusterId = SubClusterId.newInstance("SC");
     registerSubCluster(createSubClusterInfo(subClusterId));
 
+    long previousHeartBeat =
+        querySubClusterInfo(subClusterId).getLastHeartBeat();
+
     SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest
         .newInstance(subClusterId, SubClusterState.SC_RUNNING, "capability");
     stateStore.subClusterHeartbeat(heartbeatRequest);
 
+    long currentTimeStamp =
+        Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
+
     Assert.assertEquals(SubClusterState.SC_RUNNING,
         querySubClusterInfo(subClusterId).getState());
-    Assert.assertNotNull(querySubClusterInfo(subClusterId).getLastHeartBeat());
+
+    // The saved heartbeat is between the old one and the current timestamp
+    Assert.assertTrue(querySubClusterInfo(subClusterId)
+        .getLastHeartBeat() <= currentTimeStamp);
+    Assert.assertTrue(querySubClusterInfo(subClusterId)
+        .getLastHeartBeat() >= previousHeartBeat);
   }
 
   @Test
@@ -219,9 +241,8 @@ public abstract class FederationStateStoreBaseTest {
       stateStore.subClusterHeartbeat(heartbeatRequest);
       Assert.fail();
     } catch (FederationStateStoreException e) {
-      Assert.assertEquals(
-          FederationStateStoreErrorCode.MEMBERSHIP_UPDATE_HEARTBEAT_FAIL,
-          e.getCode());
+      Assert.assertTrue(e.getMessage()
+          .startsWith("SubCluster SC does not exist; cannot heartbeat"));
     }
   }
 
@@ -281,9 +302,8 @@ public abstract class FederationStateStoreBaseTest {
       queryApplicationHomeSC(appId);
       Assert.fail();
     } catch (FederationStateStoreException e) {
-      Assert.assertEquals(
-          FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL,
-          e.getCode());
+      Assert.assertTrue(e.getMessage()
+          .startsWith("Application " + appId + " does not exist"));
     }
 
   }
@@ -298,8 +318,8 @@ public abstract class FederationStateStoreBaseTest {
       stateStore.deleteApplicationHomeSubCluster(delRequest);
       Assert.fail();
     } catch (FederationStateStoreException e) {
-      Assert.assertEquals(
-          FederationStateStoreErrorCode.APPLICATIONS_DELETE_FAIL, e.getCode());
+      Assert.assertTrue(e.getMessage()
+          .startsWith("Application " + appId.toString() + " does not exist"));
     }
   }
 
@@ -331,9 +351,8 @@ public abstract class FederationStateStoreBaseTest {
       stateStore.getApplicationHomeSubCluster(request);
       Assert.fail();
     } catch (FederationStateStoreException e) {
-      Assert.assertEquals(
-          FederationStateStoreErrorCode.APPLICATIONS_SINGLE_SELECT_FAIL,
-          e.getCode());
+      Assert.assertTrue(e.getMessage()
+          .startsWith("Application " + appId.toString() + " does not exist"));
     }
   }
 
@@ -397,8 +416,8 @@ public abstract class FederationStateStoreBaseTest {
       stateStore.updateApplicationHomeSubCluster((updateRequest));
       Assert.fail();
     } catch (FederationStateStoreException e) {
-      Assert.assertEquals(
-          FederationStateStoreErrorCode.APPLICATIONS_UPDATE_FAIL, e.getCode());
+      Assert.assertTrue(e.getMessage()
+          .startsWith("Application " + appId.toString() + " does not exist"));
     }
   }
 
@@ -458,8 +477,8 @@ public abstract class FederationStateStoreBaseTest {
       stateStore.getPolicyConfiguration(request);
       Assert.fail();
     } catch (FederationStateStoreException e) {
-      Assert.assertEquals(
-          FederationStateStoreErrorCode.POLICY_SINGLE_SELECT_FAIL, e.getCode());
+      Assert.assertTrue(
+          e.getMessage().startsWith("Policy for queue Queue does not exist"));
     }
   }
 
@@ -499,8 +518,9 @@ public abstract class FederationStateStoreBaseTest {
 
   private SubClusterPolicyConfiguration createSCPolicyConf(String queueName,
       String policyType) {
-    return SubClusterPolicyConfiguration.newInstance(queueName, policyType,
-        ByteBuffer.allocate(1));
+    ByteBuffer bb = ByteBuffer.allocate(100);
+    bb.put((byte) 0x02);
+    return SubClusterPolicyConfiguration.newInstance(queueName, policyType, bb);
   }
 
   private void addApplicationHomeSC(ApplicationId appId,
@@ -558,4 +578,8 @@ public abstract class FederationStateStoreBaseTest {
     this.conf = conf;
   }
 
+  protected Configuration getConf() {
+    return conf;
+  }
+
 }

+ 252 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/HSQLDBFederationStateStore.java

@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.impl;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HSQLDB implementation of {@link FederationStateStore}.
+ */
+public class HSQLDBFederationStateStore extends SQLFederationStateStore {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(HSQLDBFederationStateStore.class);
+
+  private Connection conn;
+
+  private static final String TABLE_APPLICATIONSHOMESUBCLUSTER =
+      " CREATE TABLE applicationsHomeSubCluster ("
+          + " applicationId varchar(64) NOT NULL,"
+          + " homeSubCluster varchar(256) NOT NULL,"
+          + " CONSTRAINT pk_applicationId PRIMARY KEY (applicationId))";
+
+  private static final String TABLE_MEMBERSHIP =
+      "CREATE TABLE membership ( subClusterId varchar(256) NOT NULL,"
+          + " amRMServiceAddress varchar(256) NOT NULL,"
+          + " clientRMServiceAddress varchar(256) NOT NULL,"
+          + " rmAdminServiceAddress varchar(256) NOT NULL,"
+          + " rmWebServiceAddress varchar(256) NOT NULL,"
+          + " lastHeartBeat datetime NOT NULL, state varchar(32) NOT NULL,"
+          + " lastStartTime bigint NULL, capability varchar(6000) NOT NULL,"
+          + " CONSTRAINT pk_subClusterId PRIMARY KEY (subClusterId))";
+
+  private static final String TABLE_POLICIES =
+      "CREATE TABLE policies ( queue varchar(256) NOT NULL,"
+          + " policyType varchar(256) NOT NULL, params varbinary(512),"
+          + " CONSTRAINT pk_queue PRIMARY KEY (queue))";
+
+  private static final String SP_REGISTERSUBCLUSTER =
+      "CREATE PROCEDURE sp_registerSubCluster("
+          + " IN subClusterId_IN varchar(256),"
+          + " IN amRMServiceAddress_IN varchar(256),"
+          + " IN clientRMServiceAddress_IN varchar(256),"
+          + " IN rmAdminServiceAddress_IN varchar(256),"
+          + " IN rmWebServiceAddress_IN varchar(256),"
+          + " IN state_IN varchar(256),"
+          + " IN lastStartTime_IN bigint, IN capability_IN varchar(6000),"
+          + " OUT rowCount_OUT int)MODIFIES SQL DATA BEGIN ATOMIC"
+          + " DELETE FROM membership WHERE (subClusterId = subClusterId_IN);"
+          + " INSERT INTO membership ( subClusterId,"
+          + " amRMServiceAddress, clientRMServiceAddress,"
+          + " rmAdminServiceAddress, rmWebServiceAddress,"
+          + " lastHeartBeat, state, lastStartTime,"
+          + " capability) VALUES ( subClusterId_IN,"
+          + " amRMServiceAddress_IN, clientRMServiceAddress_IN,"
+          + " rmAdminServiceAddress_IN, rmWebServiceAddress_IN,"
+          + " NOW() AT TIME ZONE INTERVAL '0:00' HOUR TO MINUTE,"
+          + " state_IN, lastStartTime_IN, capability_IN);"
+          + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
+
+  private static final String SP_DEREGISTERSUBCLUSTER =
+      "CREATE PROCEDURE sp_deregisterSubCluster("
+          + " IN subClusterId_IN varchar(256),"
+          + " IN state_IN varchar(64), OUT rowCount_OUT int)"
+          + " MODIFIES SQL DATA BEGIN ATOMIC"
+          + " UPDATE membership SET state = state_IN WHERE ("
+          + " subClusterId = subClusterId_IN AND state != state_IN);"
+          + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
+
+  private static final String SP_SUBCLUSTERHEARTBEAT =
+      "CREATE PROCEDURE sp_subClusterHeartbeat("
+          + " IN subClusterId_IN varchar(256), IN state_IN varchar(64),"
+          + " IN capability_IN varchar(6000), OUT rowCount_OUT int)"
+          + " MODIFIES SQL DATA BEGIN ATOMIC UPDATE membership"
+          + " SET capability = capability_IN, state = state_IN,"
+          + " lastHeartBeat = NOW() AT TIME ZONE INTERVAL '0:00'"
+          + " HOUR TO MINUTE WHERE subClusterId = subClusterId_IN;"
+          + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
+
+  private static final String SP_GETSUBCLUSTER =
+      "CREATE PROCEDURE sp_getSubCluster( IN subClusterId_IN varchar(256),"
+          + " OUT amRMServiceAddress_OUT varchar(256),"
+          + " OUT clientRMServiceAddress_OUT varchar(256),"
+          + " OUT rmAdminServiceAddress_OUT varchar(256),"
+          + " OUT rmWebServiceAddress_OUT varchar(256),"
+          + " OUT lastHeartBeat_OUT datetime, OUT state_OUT varchar(64),"
+          + " OUT lastStartTime_OUT bigint,"
+          + " OUT capability_OUT varchar(6000))"
+          + " MODIFIES SQL DATA BEGIN ATOMIC SELECT amRMServiceAddress,"
+          + " clientRMServiceAddress,"
+          + " rmAdminServiceAddress, rmWebServiceAddress,"
+          + " lastHeartBeat, state, lastStartTime, capability"
+          + " INTO amRMServiceAddress_OUT, clientRMServiceAddress_OUT,"
+          + " rmAdminServiceAddress_OUT,"
+          + " rmWebServiceAddress_OUT, lastHeartBeat_OUT,"
+          + " state_OUT, lastStartTime_OUT, capability_OUT"
+          + " FROM membership WHERE subClusterId = subClusterId_IN; END";
+
+  private static final String SP_GETSUBCLUSTERS =
+      "CREATE PROCEDURE sp_getSubClusters()"
+          + " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC"
+          + " DECLARE result CURSOR FOR"
+          + " SELECT subClusterId, amRMServiceAddress, clientRMServiceAddress,"
+          + " rmAdminServiceAddress, rmWebServiceAddress, lastHeartBeat,"
+          + " state, lastStartTime, capability"
+          + " FROM membership; OPEN result; END";
+
+  private static final String SP_ADDAPPLICATIONHOMESUBCLUSTER =
+      "CREATE PROCEDURE sp_addApplicationHomeSubCluster("
+          + " IN applicationId_IN varchar(64),"
+          + " IN homeSubCluster_IN varchar(256),"
+          + " OUT storedHomeSubCluster_OUT varchar(256), OUT rowCount_OUT int)"
+          + " MODIFIES SQL DATA BEGIN ATOMIC"
+          + " INSERT INTO applicationsHomeSubCluster "
+          + " (applicationId,homeSubCluster) "
+          + " (SELECT applicationId_IN, homeSubCluster_IN"
+          + " FROM applicationsHomeSubCluster"
+          + " WHERE applicationId = applicationId_IN"
+          + " HAVING COUNT(*) = 0 );"
+          + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT;"
+          + " SELECT homeSubCluster INTO storedHomeSubCluster_OUT"
+          + " FROM applicationsHomeSubCluster"
+          + " WHERE applicationId = applicationID_IN; END";
+
+  private static final String SP_UPDATEAPPLICATIONHOMESUBCLUSTER =
+      "CREATE PROCEDURE sp_updateApplicationHomeSubCluster("
+          + " IN applicationId_IN varchar(64),"
+          + " IN homeSubCluster_IN varchar(256), OUT rowCount_OUT int)"
+          + " MODIFIES SQL DATA BEGIN ATOMIC"
+          + " UPDATE applicationsHomeSubCluster"
+          + " SET homeSubCluster = homeSubCluster_IN"
+          + " WHERE applicationId = applicationId_IN;"
+          + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
+
+  private static final String SP_GETAPPLICATIONHOMESUBCLUSTER =
+      "CREATE PROCEDURE sp_getApplicationHomeSubCluster("
+          + " IN applicationId_IN varchar(64),"
+          + " OUT homeSubCluster_OUT varchar(256))"
+          + " MODIFIES SQL DATA BEGIN ATOMIC"
+          + " SELECT homeSubCluster INTO homeSubCluster_OUT"
+          + " FROM applicationsHomeSubCluster"
+          + " WHERE applicationId = applicationID_IN; END";
+
+  private static final String SP_GETAPPLICATIONSHOMESUBCLUSTER =
+      "CREATE PROCEDURE sp_getApplicationsHomeSubCluster()"
+          + " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC"
+          + " DECLARE result CURSOR FOR"
+          + " SELECT applicationId, homeSubCluster"
+          + " FROM applicationsHomeSubCluster; OPEN result; END";
+
+  private static final String SP_DELETEAPPLICATIONHOMESUBCLUSTER =
+      "CREATE PROCEDURE sp_deleteApplicationHomeSubCluster("
+          + " IN applicationId_IN varchar(64), OUT rowCount_OUT int)"
+          + " MODIFIES SQL DATA BEGIN ATOMIC"
+          + " DELETE FROM applicationsHomeSubCluster"
+          + " WHERE applicationId = applicationId_IN;"
+          + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
+
+  private static final String SP_SETPOLICYCONFIGURATION =
+      "CREATE PROCEDURE sp_setPolicyConfiguration("
+          + " IN queue_IN varchar(256), IN policyType_IN varchar(256),"
+          + " IN params_IN varbinary(512), OUT rowCount_OUT int)"
+          + " MODIFIES SQL DATA BEGIN ATOMIC"
+          + " DELETE FROM policies WHERE queue = queue_IN;"
+          + " INSERT INTO policies (queue, policyType, params)"
+          + " VALUES (queue_IN, policyType_IN, params_IN);"
+          + " GET DIAGNOSTICS rowCount_OUT = ROW_COUNT; END";
+
+  private static final String SP_GETPOLICYCONFIGURATION =
+      "CREATE PROCEDURE sp_getPolicyConfiguration("
+          + " IN queue_IN varchar(256), OUT policyType_OUT varchar(256),"
+          + " OUT params_OUT varbinary(512)) MODIFIES SQL DATA BEGIN ATOMIC"
+          + " SELECT policyType, params INTO policyType_OUT, params_OUT"
+          + " FROM policies WHERE queue = queue_IN; END";
+
+  private static final String SP_GETPOLICIESCONFIGURATIONS =
+      "CREATE PROCEDURE sp_getPoliciesConfigurations()"
+          + " MODIFIES SQL DATA DYNAMIC RESULT SETS 1 BEGIN ATOMIC"
+          + " DECLARE result CURSOR FOR"
+          + " SELECT * FROM policies; OPEN result; END";
+
+  @Override
+  public void init(Configuration conf) {
+    try {
+      super.init(conf);
+    } catch (YarnException e1) {
+      LOG.error("ERROR: failed to init HSQLDB " + e1.getMessage());
+    }
+    try {
+      conn = getConnection();
+
+      LOG.info("Database Init: Start");
+
+      conn.prepareStatement(TABLE_APPLICATIONSHOMESUBCLUSTER).execute();
+      conn.prepareStatement(TABLE_MEMBERSHIP).execute();
+      conn.prepareStatement(TABLE_POLICIES).execute();
+
+      conn.prepareStatement(SP_REGISTERSUBCLUSTER).execute();
+      conn.prepareStatement(SP_DEREGISTERSUBCLUSTER).execute();
+      conn.prepareStatement(SP_SUBCLUSTERHEARTBEAT).execute();
+      conn.prepareStatement(SP_GETSUBCLUSTER).execute();
+      conn.prepareStatement(SP_GETSUBCLUSTERS).execute();
+
+      conn.prepareStatement(SP_ADDAPPLICATIONHOMESUBCLUSTER).execute();
+      conn.prepareStatement(SP_UPDATEAPPLICATIONHOMESUBCLUSTER).execute();
+      conn.prepareStatement(SP_GETAPPLICATIONHOMESUBCLUSTER).execute();
+      conn.prepareStatement(SP_GETAPPLICATIONSHOMESUBCLUSTER).execute();
+      conn.prepareStatement(SP_DELETEAPPLICATIONHOMESUBCLUSTER).execute();
+
+      conn.prepareStatement(SP_SETPOLICYCONFIGURATION).execute();
+      conn.prepareStatement(SP_GETPOLICYCONFIGURATION).execute();
+      conn.prepareStatement(SP_GETPOLICIESCONFIGURATIONS).execute();
+
+      LOG.info("Database Init: Complete");
+      conn.close();
+    } catch (SQLException e) {
+      LOG.error("ERROR: failed to inizialize HSQLDB " + e.getMessage());
+    }
+  }
+
+  public void closeConnection() {
+    try {
+      conn.close();
+    } catch (SQLException e) {
+      LOG.error(
+          "ERROR: failed to close connection to HSQLDB DB " + e.getMessage());
+    }
+  }
+
+}

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java

@@ -28,7 +28,8 @@ public class TestMemoryFederationStateStore
 
   @Override
   protected FederationStateStore createStateStore() {
-    super.setConf(new Configuration());
+    Configuration conf = new Configuration();
+    super.setConf(conf);
     return new MemoryFederationStateStore();
   }
 }

+ 49 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java

@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.impl;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+
+/**
+ * Unit tests for SQLFederationStateStore.
+ */
+public class TestSQLFederationStateStore extends FederationStateStoreBaseTest {
+
+  private static final String HSQLDB_DRIVER = "org.hsqldb.jdbc.JDBCDataSource";
+  private static final String DATABASE_URL = "jdbc:hsqldb:mem:state";
+  private static final String DATABASE_USERNAME = "SA";
+  private static final String DATABASE_PASSWORD = "";
+
+  @Override
+  protected FederationStateStore createStateStore() {
+
+    YarnConfiguration conf = new YarnConfiguration();
+
+    conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_JDBC_CLASS,
+        HSQLDB_DRIVER);
+    conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_USERNAME,
+        DATABASE_USERNAME);
+    conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_PASSWORD,
+        DATABASE_PASSWORD);
+    conf.set(YarnConfiguration.FEDERATION_STATESTORE_SQL_URL,
+        DATABASE_URL + System.currentTimeMillis());
+    super.setConf(conf);
+    return new HSQLDBFederationStateStore();
+  }
+}

+ 73 - 73
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/utils/TestFederationStateStoreInputValidator.java

@@ -145,7 +145,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.fail(e.getMessage());
     }
@@ -155,7 +155,7 @@ public class TestFederationStateStoreInputValidator {
     try {
       SubClusterRegisterRequest request = null;
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -170,7 +170,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -188,7 +188,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -206,7 +206,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -224,7 +224,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -242,7 +242,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.fail(e.getMessage());
     }
@@ -257,7 +257,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.fail(e.getMessage());
     }
@@ -276,7 +276,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -294,7 +294,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -315,7 +315,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -332,7 +332,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -350,7 +350,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -368,7 +368,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -386,7 +386,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -404,7 +404,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -421,7 +421,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -438,7 +438,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -460,7 +460,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -477,7 +477,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -494,7 +494,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -510,7 +510,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -526,7 +526,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -543,7 +543,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -560,7 +560,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -576,7 +576,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterRegisterRequest request =
           SubClusterRegisterRequest.newInstance(subClusterInfo);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterRegisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -594,7 +594,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterDeregisterRequest request =
           SubClusterDeregisterRequest.newInstance(subClusterId, stateLost);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterDeregisterRequest(request);
+          .validate(request);
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.fail(e.getMessage());
     }
@@ -604,7 +604,7 @@ public class TestFederationStateStoreInputValidator {
     try {
       SubClusterDeregisterRequest request = null;
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterDeregisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -618,7 +618,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterDeregisterRequest request =
           SubClusterDeregisterRequest.newInstance(subClusterIdNull, stateLost);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterDeregisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -632,7 +632,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterDeregisterRequest request = SubClusterDeregisterRequest
           .newInstance(subClusterIdInvalid, stateLost);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterDeregisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -646,7 +646,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterDeregisterRequest request =
           SubClusterDeregisterRequest.newInstance(subClusterId, stateNull);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterDeregisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -660,7 +660,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterDeregisterRequest request =
           SubClusterDeregisterRequest.newInstance(subClusterId, stateNew);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterDeregisterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -677,7 +677,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest
           .newInstance(subClusterId, lastHeartBeat, stateLost, capability);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterHeartbeatRequest(request);
+          .validate(request);
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.fail(e.getMessage());
     }
@@ -687,7 +687,7 @@ public class TestFederationStateStoreInputValidator {
     try {
       SubClusterHeartbeatRequest request = null;
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterHeartbeatRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -701,7 +701,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest
           .newInstance(subClusterIdNull, lastHeartBeat, stateLost, capability);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterHeartbeatRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -716,7 +716,7 @@ public class TestFederationStateStoreInputValidator {
           SubClusterHeartbeatRequest.newInstance(subClusterIdInvalid,
               lastHeartBeat, stateLost, capability);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterHeartbeatRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -730,7 +730,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest
           .newInstance(subClusterId, lastHeartBeat, stateNull, capability);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterHeartbeatRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -745,7 +745,7 @@ public class TestFederationStateStoreInputValidator {
           SubClusterHeartbeatRequest.newInstance(subClusterId,
               lastHeartBeatNegative, stateLost, capability);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterHeartbeatRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -759,7 +759,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest
           .newInstance(subClusterId, lastHeartBeat, stateLost, capabilityNull);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterHeartbeatRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -773,7 +773,7 @@ public class TestFederationStateStoreInputValidator {
       SubClusterHeartbeatRequest request = SubClusterHeartbeatRequest
           .newInstance(subClusterId, lastHeartBeat, stateLost, capabilityEmpty);
       FederationMembershipStateStoreInputValidator
-          .validateSubClusterHeartbeatRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -791,7 +791,7 @@ public class TestFederationStateStoreInputValidator {
       GetSubClusterInfoRequest request =
           GetSubClusterInfoRequest.newInstance(subClusterId);
       FederationMembershipStateStoreInputValidator
-          .validateGetSubClusterInfoRequest(request);
+          .validate(request);
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.fail(e.getMessage());
     }
@@ -801,7 +801,7 @@ public class TestFederationStateStoreInputValidator {
     try {
       GetSubClusterInfoRequest request = null;
       FederationMembershipStateStoreInputValidator
-          .validateGetSubClusterInfoRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -815,7 +815,7 @@ public class TestFederationStateStoreInputValidator {
       GetSubClusterInfoRequest request =
           GetSubClusterInfoRequest.newInstance(subClusterIdNull);
       FederationMembershipStateStoreInputValidator
-          .validateGetSubClusterInfoRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -829,7 +829,7 @@ public class TestFederationStateStoreInputValidator {
       GetSubClusterInfoRequest request =
           GetSubClusterInfoRequest.newInstance(subClusterIdInvalid);
       FederationMembershipStateStoreInputValidator
-          .validateGetSubClusterInfoRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -850,7 +850,7 @@ public class TestFederationStateStoreInputValidator {
           AddApplicationHomeSubClusterRequest
               .newInstance(applicationHomeSubCluster);
       FederationApplicationHomeSubClusterStoreInputValidator
-          .validateAddApplicationHomeSubClusterRequest(request);
+          .validate(request);
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.fail(e.getMessage());
     }
@@ -860,7 +860,7 @@ public class TestFederationStateStoreInputValidator {
     try {
       AddApplicationHomeSubClusterRequest request = null;
       FederationApplicationHomeSubClusterStoreInputValidator
-          .validateAddApplicationHomeSubClusterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.assertTrue(e.getMessage()
@@ -875,7 +875,7 @@ public class TestFederationStateStoreInputValidator {
           AddApplicationHomeSubClusterRequest
               .newInstance(applicationHomeSubCluster);
       FederationApplicationHomeSubClusterStoreInputValidator
-          .validateAddApplicationHomeSubClusterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.assertTrue(
@@ -891,7 +891,7 @@ public class TestFederationStateStoreInputValidator {
           AddApplicationHomeSubClusterRequest
               .newInstance(applicationHomeSubCluster);
       FederationApplicationHomeSubClusterStoreInputValidator
-          .validateAddApplicationHomeSubClusterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -908,7 +908,7 @@ public class TestFederationStateStoreInputValidator {
           AddApplicationHomeSubClusterRequest
               .newInstance(applicationHomeSubCluster);
       FederationApplicationHomeSubClusterStoreInputValidator
-          .validateAddApplicationHomeSubClusterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -925,7 +925,7 @@ public class TestFederationStateStoreInputValidator {
           AddApplicationHomeSubClusterRequest
               .newInstance(applicationHomeSubCluster);
       FederationApplicationHomeSubClusterStoreInputValidator
-          .validateAddApplicationHomeSubClusterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.assertTrue(e.getMessage().startsWith("Missing Application Id."));
@@ -944,7 +944,7 @@ public class TestFederationStateStoreInputValidator {
           UpdateApplicationHomeSubClusterRequest
               .newInstance(applicationHomeSubCluster);
       FederationApplicationHomeSubClusterStoreInputValidator
-          .validateUpdateApplicationHomeSubClusterRequest(request);
+          .validate(request);
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.fail(e.getMessage());
     }
@@ -954,7 +954,7 @@ public class TestFederationStateStoreInputValidator {
     try {
       UpdateApplicationHomeSubClusterRequest request = null;
       FederationApplicationHomeSubClusterStoreInputValidator
-          .validateUpdateApplicationHomeSubClusterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.assertTrue(e.getMessage()
@@ -969,7 +969,7 @@ public class TestFederationStateStoreInputValidator {
           UpdateApplicationHomeSubClusterRequest
               .newInstance(applicationHomeSubCluster);
       FederationApplicationHomeSubClusterStoreInputValidator
-          .validateUpdateApplicationHomeSubClusterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.assertTrue(
@@ -985,7 +985,7 @@ public class TestFederationStateStoreInputValidator {
           UpdateApplicationHomeSubClusterRequest
               .newInstance(applicationHomeSubCluster);
       FederationApplicationHomeSubClusterStoreInputValidator
-          .validateUpdateApplicationHomeSubClusterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -1002,7 +1002,7 @@ public class TestFederationStateStoreInputValidator {
           UpdateApplicationHomeSubClusterRequest
               .newInstance(applicationHomeSubCluster);
       FederationApplicationHomeSubClusterStoreInputValidator
-          .validateUpdateApplicationHomeSubClusterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       LOG.info(e.getMessage());
@@ -1019,7 +1019,7 @@ public class TestFederationStateStoreInputValidator {
           UpdateApplicationHomeSubClusterRequest
               .newInstance(applicationHomeSubCluster);
       FederationApplicationHomeSubClusterStoreInputValidator
-          .validateUpdateApplicationHomeSubClusterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.assertTrue(e.getMessage().startsWith("Missing Application Id."));
@@ -1035,7 +1035,7 @@ public class TestFederationStateStoreInputValidator {
       GetApplicationHomeSubClusterRequest request =
           GetApplicationHomeSubClusterRequest.newInstance(appId);
       FederationApplicationHomeSubClusterStoreInputValidator
-          .validateGetApplicationHomeSubClusterRequest(request);
+          .validate(request);
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.fail(e.getMessage());
     }
@@ -1045,7 +1045,7 @@ public class TestFederationStateStoreInputValidator {
     try {
       GetApplicationHomeSubClusterRequest request = null;
       FederationApplicationHomeSubClusterStoreInputValidator
-          .validateGetApplicationHomeSubClusterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.assertTrue(e.getMessage()
@@ -1058,7 +1058,7 @@ public class TestFederationStateStoreInputValidator {
       GetApplicationHomeSubClusterRequest request =
           GetApplicationHomeSubClusterRequest.newInstance(appIdNull);
       FederationApplicationHomeSubClusterStoreInputValidator
-          .validateGetApplicationHomeSubClusterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.assertTrue(e.getMessage().startsWith("Missing Application Id."));
@@ -1075,7 +1075,7 @@ public class TestFederationStateStoreInputValidator {
       DeleteApplicationHomeSubClusterRequest request =
           DeleteApplicationHomeSubClusterRequest.newInstance(appId);
       FederationApplicationHomeSubClusterStoreInputValidator
-          .validateDeleteApplicationHomeSubClusterRequest(request);
+          .validate(request);
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.fail(e.getMessage());
     }
@@ -1085,7 +1085,7 @@ public class TestFederationStateStoreInputValidator {
     try {
       DeleteApplicationHomeSubClusterRequest request = null;
       FederationApplicationHomeSubClusterStoreInputValidator
-          .validateDeleteApplicationHomeSubClusterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.assertTrue(e.getMessage()
@@ -1098,7 +1098,7 @@ public class TestFederationStateStoreInputValidator {
       DeleteApplicationHomeSubClusterRequest request =
           DeleteApplicationHomeSubClusterRequest.newInstance(appIdNull);
       FederationApplicationHomeSubClusterStoreInputValidator
-          .validateDeleteApplicationHomeSubClusterRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.assertTrue(e.getMessage().startsWith("Missing Application Id."));
@@ -1115,7 +1115,7 @@ public class TestFederationStateStoreInputValidator {
       GetSubClusterPolicyConfigurationRequest request =
           GetSubClusterPolicyConfigurationRequest.newInstance(queue);
       FederationPolicyStoreInputValidator
-          .validateGetSubClusterPolicyConfigurationRequest(request);
+          .validate(request);
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.fail(e.getMessage());
     }
@@ -1125,7 +1125,7 @@ public class TestFederationStateStoreInputValidator {
     try {
       GetSubClusterPolicyConfigurationRequest request = null;
       FederationPolicyStoreInputValidator
-          .validateGetSubClusterPolicyConfigurationRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.assertTrue(e.getMessage()
@@ -1138,7 +1138,7 @@ public class TestFederationStateStoreInputValidator {
       GetSubClusterPolicyConfigurationRequest request =
           GetSubClusterPolicyConfigurationRequest.newInstance(queueNull);
       FederationPolicyStoreInputValidator
-          .validateGetSubClusterPolicyConfigurationRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.assertTrue(e.getMessage().startsWith("Missing Queue."));
@@ -1150,7 +1150,7 @@ public class TestFederationStateStoreInputValidator {
       GetSubClusterPolicyConfigurationRequest request =
           GetSubClusterPolicyConfigurationRequest.newInstance(queueEmpty);
       FederationPolicyStoreInputValidator
-          .validateGetSubClusterPolicyConfigurationRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.assertTrue(e.getMessage().startsWith("Missing Queue."));
@@ -1169,7 +1169,7 @@ public class TestFederationStateStoreInputValidator {
       SetSubClusterPolicyConfigurationRequest request =
           SetSubClusterPolicyConfigurationRequest.newInstance(policy);
       FederationPolicyStoreInputValidator
-          .validateSetSubClusterPolicyConfigurationRequest(request);
+          .validate(request);
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.fail(e.getMessage());
     }
@@ -1179,7 +1179,7 @@ public class TestFederationStateStoreInputValidator {
     try {
       SetSubClusterPolicyConfigurationRequest request = null;
       FederationPolicyStoreInputValidator
-          .validateSetSubClusterPolicyConfigurationRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.assertTrue(e.getMessage()
@@ -1193,7 +1193,7 @@ public class TestFederationStateStoreInputValidator {
       SetSubClusterPolicyConfigurationRequest request =
           SetSubClusterPolicyConfigurationRequest.newInstance(policy);
       FederationPolicyStoreInputValidator
-          .validateSetSubClusterPolicyConfigurationRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.assertTrue(
@@ -1208,7 +1208,7 @@ public class TestFederationStateStoreInputValidator {
       SetSubClusterPolicyConfigurationRequest request =
           SetSubClusterPolicyConfigurationRequest.newInstance(policy);
       FederationPolicyStoreInputValidator
-          .validateSetSubClusterPolicyConfigurationRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.assertTrue(e.getMessage().startsWith("Missing Queue."));
@@ -1222,7 +1222,7 @@ public class TestFederationStateStoreInputValidator {
       SetSubClusterPolicyConfigurationRequest request =
           SetSubClusterPolicyConfigurationRequest.newInstance(policy);
       FederationPolicyStoreInputValidator
-          .validateSetSubClusterPolicyConfigurationRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.assertTrue(e.getMessage().startsWith("Missing Queue."));
@@ -1236,7 +1236,7 @@ public class TestFederationStateStoreInputValidator {
       SetSubClusterPolicyConfigurationRequest request =
           SetSubClusterPolicyConfigurationRequest.newInstance(policy);
       FederationPolicyStoreInputValidator
-          .validateSetSubClusterPolicyConfigurationRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.assertTrue(e.getMessage().startsWith("Missing Policy Type."));
@@ -1250,7 +1250,7 @@ public class TestFederationStateStoreInputValidator {
       SetSubClusterPolicyConfigurationRequest request =
           SetSubClusterPolicyConfigurationRequest.newInstance(policy);
       FederationPolicyStoreInputValidator
-          .validateSetSubClusterPolicyConfigurationRequest(request);
+          .validate(request);
       Assert.fail();
     } catch (FederationStateStoreInvalidInputException e) {
       Assert.assertTrue(e.getMessage().startsWith("Missing Policy Type."));

+ 2 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java

@@ -24,7 +24,6 @@ import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreErrorCode;
 import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreException;
 import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
 import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
@@ -82,10 +81,8 @@ public class TestFederationStateStoreFacadeRetry {
     conf = new Configuration();
     conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries);
     RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf);
-    RetryAction action = policy.shouldRetry(
-        new FederationStateStoreException(
-            FederationStateStoreErrorCode.APPLICATIONS_INSERT_FAIL),
-        0, 0, false);
+    RetryAction action = policy
+        .shouldRetry(new FederationStateStoreException("Error"), 0, 0, false);
     Assert.assertEquals(RetryAction.FAIL.action, action.action);
   }
 

+ 511 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreStoreProcs.sql

@@ -0,0 +1,511 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+USE [FederationStateStore]
+GO
+
+IF OBJECT_ID ( '[sp_addApplicationHomeSubCluster]', 'P' ) IS NOT NULL
+    DROP PROCEDURE [sp_addApplicationHomeSubCluster];
+GO
+
+CREATE PROCEDURE [dbo].[sp_addApplicationHomeSubCluster]
+    @applicationId VARCHAR(64),
+    @homeSubCluster VARCHAR(256),
+    @storedHomeSubCluster VARCHAR(256) OUTPUT,
+    @rowCount int OUTPUT
+AS BEGIN
+    DECLARE @errorMessage nvarchar(4000)
+
+    BEGIN TRY
+        BEGIN TRAN
+            -- If application to sub-cluster map doesn't exist, insert it.
+            -- Otherwise don't change the current mapping.
+            IF NOT EXISTS (SELECT TOP 1 *
+                       FROM [dbo].[applicationsHomeSubCluster]
+                       WHERE [applicationId] = @applicationId)
+
+                INSERT INTO [dbo].[applicationsHomeSubCluster] (
+                    [applicationId],
+                    [homeSubCluster])
+                VALUES (
+                    @applicationId,
+                    @homeSubCluster);
+            -- End of the IF block
+
+            SELECT @rowCount = @@ROWCOUNT;
+
+            SELECT @storedHomeSubCluster = [homeSubCluster]
+            FROM [dbo].[applicationsHomeSubCluster]
+            WHERE [applicationId] = @applicationId;
+
+        COMMIT TRAN
+    END TRY
+
+    BEGIN CATCH
+        ROLLBACK TRAN
+
+        SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+        /*  raise error and terminate the execution */
+        RAISERROR(@errorMessage, --- Error Message
+            1, -- Severity
+            -1 -- State
+        ) WITH log
+    END CATCH
+END;
+GO
+
+IF OBJECT_ID ( '[sp_updateApplicationHomeSubCluster]', 'P' ) IS NOT NULL
+    DROP PROCEDURE [sp_updateApplicationHomeSubCluster];
+GO
+
+CREATE PROCEDURE [dbo].[sp_updateApplicationHomeSubCluster]
+    @applicationId VARCHAR(64),
+    @homeSubCluster VARCHAR(256),
+    @rowCount int OUTPUT
+AS BEGIN
+    DECLARE @errorMessage nvarchar(4000)
+
+    BEGIN TRY
+        BEGIN TRAN
+
+            UPDATE [dbo].[applicationsHomeSubCluster]
+            SET [homeSubCluster] = @homeSubCluster
+            WHERE [applicationId] = @applicationid;
+            SELECT @rowCount = @@ROWCOUNT;
+
+        COMMIT TRAN
+    END TRY
+
+    BEGIN CATCH
+        ROLLBACK TRAN
+
+        SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+        /*  raise error and terminate the execution */
+        RAISERROR(@errorMessage, --- Error Message
+            1, -- Severity
+            -1 -- State
+        ) WITH log
+    END CATCH
+END;
+GO
+
+IF OBJECT_ID ( '[sp_getApplicationsHomeSubCluster]', 'P' ) IS NOT NULL
+    DROP PROCEDURE [sp_getApplicationsHomeSubCluster];
+GO
+
+CREATE PROCEDURE [dbo].[sp_getApplicationsHomeSubCluster]
+AS BEGIN
+    DECLARE @errorMessage nvarchar(4000)
+
+    BEGIN TRY
+        SELECT [applicationId], [homeSubCluster], [createTime]
+        FROM [dbo].[applicationsHomeSubCluster]
+    END TRY
+
+    BEGIN CATCH
+        SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+        /*  raise error and terminate the execution */
+        RAISERROR(@errorMessage, --- Error Message
+            1, -- Severity
+            -1 -- State
+        ) WITH log
+    END CATCH
+END;
+GO
+
+IF OBJECT_ID ( '[sp_getApplicationHomeSubCluster]', 'P' ) IS NOT NULL
+    DROP PROCEDURE [sp_getApplicationHomeSubCluster];
+GO
+
+CREATE PROCEDURE [dbo].[sp_getApplicationHomeSubCluster]
+    @applicationId VARCHAR(64),
+    @homeSubCluster VARCHAR(256) OUTPUT
+AS BEGIN
+    DECLARE @errorMessage nvarchar(4000)
+
+    BEGIN TRY
+
+        SELECT @homeSubCluster = [homeSubCluster]
+        FROM [dbo].[applicationsHomeSubCluster]
+        WHERE [applicationId] = @applicationid;
+
+    END TRY
+
+    BEGIN CATCH
+
+        SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+        /*  raise error and terminate the execution */
+        RAISERROR(@errorMessage, --- Error Message
+            1, -- Severity
+            -1 -- State
+        ) WITH log
+    END CATCH
+END;
+GO
+
+IF OBJECT_ID ( '[sp_deleteApplicationHomeSubCluster]', 'P' ) IS NOT NULL
+    DROP PROCEDURE [sp_deleteApplicationHomeSubCluster];
+GO
+
+CREATE PROCEDURE [dbo].[sp_deleteApplicationHomeSubCluster]
+    @applicationId VARCHAR(64),
+    @rowCount int OUTPUT
+AS BEGIN
+    DECLARE @errorMessage nvarchar(4000)
+
+    BEGIN TRY
+        BEGIN TRAN
+
+            DELETE FROM [dbo].[applicationsHomeSubCluster]
+            WHERE [applicationId] = @applicationId;
+            SELECT @rowCount = @@ROWCOUNT;
+
+        COMMIT TRAN
+    END TRY
+
+    BEGIN CATCH
+        ROLLBACK TRAN
+
+        SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+        /*  raise error and terminate the execution */
+        RAISERROR(@errorMessage, --- Error Message
+            1, -- Severity
+            -1 -- State
+        ) WITH log
+    END CATCH
+END;
+GO
+
+IF OBJECT_ID ( '[sp_registerSubCluster]', 'P' ) IS NOT NULL
+    DROP PROCEDURE [sp_registerSubCluster];
+GO
+
+CREATE PROCEDURE [dbo].[sp_registerSubCluster]
+    @subClusterId VARCHAR(256),
+    @amRMServiceAddress VARCHAR(256),
+    @clientRMServiceAddress VARCHAR(256),
+    @rmAdminServiceAddress VARCHAR(256),
+    @rmWebServiceAddress VARCHAR(256),
+    @state VARCHAR(32),
+    @lastStartTime BIGINT,
+    @capability VARCHAR(6000),
+    @rowCount int OUTPUT
+AS BEGIN
+    DECLARE @errorMessage nvarchar(4000)
+
+    BEGIN TRY
+        BEGIN TRAN
+
+            DELETE FROM [dbo].[membership]
+            WHERE [subClusterId] = @subClusterId;
+            INSERT INTO [dbo].[membership] (
+                [subClusterId],
+                [amRMServiceAddress],
+                [clientRMServiceAddress],
+                [rmAdminServiceAddress],
+                [rmWebServiceAddress],
+                [lastHeartBeat],
+                [state],
+                [lastStartTime],
+                [capability] )
+            VALUES (
+                @subClusterId,
+                @amRMServiceAddress,
+                @clientRMServiceAddress,
+                @rmAdminServiceAddress,
+                @rmWebServiceAddress,
+                GETUTCDATE(),
+                @state,
+                @lastStartTime,
+                @capability);
+            SELECT @rowCount = @@ROWCOUNT;
+
+        COMMIT TRAN
+    END TRY
+
+    BEGIN CATCH
+        ROLLBACK TRAN
+
+        SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+        /*  raise error and terminate the execution */
+        RAISERROR(@errorMessage, --- Error Message
+            1, -- Severity
+            -1 -- State
+        ) WITH log
+    END CATCH
+END;
+GO
+
+IF OBJECT_ID ( '[sp_getSubClusters]', 'P' ) IS NOT NULL
+    DROP PROCEDURE [sp_getSubClusters];
+GO
+
+CREATE PROCEDURE [dbo].[sp_getSubClusters]
+AS BEGIN
+    DECLARE @errorMessage nvarchar(4000)
+
+    BEGIN TRY
+        SELECT [subClusterId], [amRMServiceAddress], [clientRMServiceAddress],
+               [rmAdminServiceAddress], [rmWebServiceAddress], [lastHeartBeat],
+               [state], [lastStartTime], [capability]
+        FROM [dbo].[membership]
+    END TRY
+
+    BEGIN CATCH
+        SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+        /*  raise error and terminate the execution */
+        RAISERROR(@errorMessage, --- Error Message
+            1, -- Severity
+            -1 -- State
+        ) WITH log
+    END CATCH
+END;
+GO
+
+IF OBJECT_ID ( '[sp_getSubCluster]', 'P' ) IS NOT NULL
+    DROP PROCEDURE [sp_getSubCluster];
+GO
+
+CREATE PROCEDURE [dbo].[sp_getSubCluster]
+    @subClusterId VARCHAR(256),
+    @amRMServiceAddress VARCHAR(256) OUTPUT,
+    @clientRMServiceAddress VARCHAR(256) OUTPUT,
+    @rmAdminServiceAddress VARCHAR(256) OUTPUT,
+    @rmWebServiceAddress VARCHAR(256) OUTPUT,
+    @lastHeartbeat DATETIME2 OUTPUT,
+    @state VARCHAR(256) OUTPUT,
+    @lastStartTime BIGINT OUTPUT,
+    @capability VARCHAR(6000) OUTPUT
+AS BEGIN
+    DECLARE @errorMessage nvarchar(4000)
+
+    BEGIN TRY
+        BEGIN TRAN
+
+            SELECT @subClusterId = [subClusterId],
+                   @amRMServiceAddress = [amRMServiceAddress],
+                   @clientRMServiceAddress = [clientRMServiceAddress],
+                   @rmAdminServiceAddress = [rmAdminServiceAddress],
+                   @rmWebServiceAddress = [rmWebServiceAddress],
+                   @lastHeartBeat = [lastHeartBeat],
+                   @state = [state],
+                   @lastStartTime = [lastStartTime],
+                   @capability = [capability]
+            FROM [dbo].[membership]
+            WHERE [subClusterId] = @subClusterId
+
+        COMMIT TRAN
+    END TRY
+
+    BEGIN CATCH
+        ROLLBACK TRAN
+
+        SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+        /*  raise error and terminate the execution */
+        RAISERROR(@errorMessage, --- Error Message
+            1, -- Severity
+            -1 -- State
+        ) WITH log
+    END CATCH
+END;
+GO
+
+
+IF OBJECT_ID ( '[sp_subClusterHeartbeat]', 'P' ) IS NOT NULL
+    DROP PROCEDURE [sp_subClusterHeartbeat];
+GO
+
+CREATE PROCEDURE [dbo].[sp_subClusterHeartbeat]
+    @subClusterId VARCHAR(256),
+    @state VARCHAR(256),
+    @capability VARCHAR(6000),
+    @rowCount int OUTPUT
+AS BEGIN
+    DECLARE @errorMessage nvarchar(4000)
+
+    BEGIN TRY
+        BEGIN TRAN
+
+            UPDATE [dbo].[membership]
+            SET [state] = @state,
+                [lastHeartbeat] = GETUTCDATE(),
+                [capability] = @capability
+            WHERE [subClusterId] = @subClusterId;
+            SELECT @rowCount = @@ROWCOUNT;
+
+        COMMIT TRAN
+    END TRY
+
+    BEGIN CATCH
+        ROLLBACK TRAN
+
+        SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+        /*  raise error and terminate the execution */
+        RAISERROR(@errorMessage, --- Error Message
+            1, -- Severity
+            -1 -- State
+        ) WITH log
+    END CATCH
+END;
+GO
+
+IF OBJECT_ID ( '[sp_deregisterSubCluster]', 'P' ) IS NOT NULL
+    DROP PROCEDURE [sp_deregisterSubCluster];
+GO
+
+CREATE PROCEDURE [dbo].[sp_deregisterSubCluster]
+    @subClusterId VARCHAR(256),
+    @state VARCHAR(256),
+    @rowCount int OUTPUT
+AS BEGIN
+    DECLARE @errorMessage nvarchar(4000)
+
+    BEGIN TRY
+        BEGIN TRAN
+
+            UPDATE [dbo].[membership]
+            SET [state] = @state
+            WHERE [subClusterId] = @subClusterId;
+            SELECT @rowCount = @@ROWCOUNT;
+
+        COMMIT TRAN
+    END TRY
+
+    BEGIN CATCH
+        ROLLBACK TRAN
+
+        SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+        /*  raise error and terminate the execution */
+        RAISERROR(@errorMessage, --- Error Message
+            1, -- Severity
+            -1 -- State
+        ) WITH log
+    END CATCH
+END;
+GO
+
+IF OBJECT_ID ( '[sp_setPolicyConfiguration]', 'P' ) IS NOT NULL
+    DROP PROCEDURE [sp_setPolicyConfiguration];
+GO
+
+CREATE PROCEDURE [dbo].[sp_setPolicyConfiguration]
+    @queue VARCHAR(256),
+    @policyType VARCHAR(256),
+    @params VARBINARY(512),
+    @rowCount int OUTPUT
+AS BEGIN
+    DECLARE @errorMessage nvarchar(4000)
+
+    BEGIN TRY
+        BEGIN TRAN
+
+            DELETE FROM [dbo].[policies]
+            WHERE [queue] = @queue;
+            INSERT INTO [dbo].[policies] (
+                [queue],
+                [policyType],
+                [params])
+            VALUES (
+                @queue,
+                @policyType,
+                @params);
+            SELECT @rowCount = @@ROWCOUNT;
+
+        COMMIT TRAN
+    END TRY
+
+    BEGIN CATCH
+        ROLLBACK TRAN
+
+        SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+        /*  raise error and terminate the execution */
+        RAISERROR(@errorMessage, --- Error Message
+            1, -- Severity
+            -1 -- State
+        ) WITH log
+    END CATCH
+END;
+GO
+
+IF OBJECT_ID ( '[sp_getPolicyConfiguration]', 'P' ) IS NOT NULL
+    DROP PROCEDURE [sp_getPolicyConfiguration];
+GO
+
+CREATE PROCEDURE [dbo].[sp_getPolicyConfiguration]
+    @queue VARCHAR(256),
+    @policyType VARCHAR(256) OUTPUT,
+    @params VARBINARY(6000) OUTPUT
+AS BEGIN
+    DECLARE @errorMessage nvarchar(4000)
+
+    BEGIN TRY
+
+        SELECT @policyType = [policyType],
+               @params = [params]
+        FROM [dbo].[policies]
+        WHERE [queue] = @queue
+
+    END TRY
+
+    BEGIN CATCH
+
+        SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+        /*  raise error and terminate the execution */
+        RAISERROR(@errorMessage, --- Error Message
+            1, -- Severity
+            -1 -- State
+        ) WITH log
+    END CATCH
+END;
+GO
+
+IF OBJECT_ID ( '[sp_getPoliciesConfigurations]', 'P' ) IS NOT NULL
+    DROP PROCEDURE [sp_getPoliciesConfigurations];
+GO
+
+CREATE PROCEDURE [dbo].[sp_getPoliciesConfigurations]
+AS BEGIN
+    DECLARE @errorMessage nvarchar(4000)
+
+    BEGIN TRY
+        SELECT  [queue], [policyType], [params] FROM [dbo].[policies]
+    END TRY
+
+    BEGIN CATCH
+        SET @errorMessage = dbo.func_FormatErrorMessage(ERROR_MESSAGE(), ERROR_LINE())
+
+        /*  raise error and terminate the execution */
+        RAISERROR(@errorMessage, --- Error Message
+            1, -- Severity
+            -1 -- State
+        ) WITH log
+    END CATCH
+END;
+GO

+ 122 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/FederationStateStore/SQLServer/FederationStateStoreTables.sql

@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+USE [FederationStateStore]
+GO
+
+IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables
+    WHERE name = 'applicationsHomeSubCluster'
+    AND schema_id = SCHEMA_ID('dbo'))
+    BEGIN
+        PRINT 'Table applicationsHomeSubCluster does not exist, create it...'
+
+        SET ANSI_NULLS ON
+
+        SET QUOTED_IDENTIFIER ON
+
+        SET ANSI_PADDING ON
+
+        CREATE TABLE [dbo].[applicationsHomeSubCluster](
+            applicationId   VARCHAR(64) COLLATE Latin1_General_100_BIN2 NOT NULL,
+            homeSubCluster  VARCHAR(256) NOT NULL,
+            createTime      DATETIME2 NOT NULL CONSTRAINT ts_createAppTime DEFAULT GETUTCDATE(),
+
+            CONSTRAINT [pk_applicationId] PRIMARY KEY
+            (
+                [applicationId]
+            )
+        )
+
+        SET ANSI_PADDING OFF
+
+        PRINT 'Table applicationsHomeSubCluster created.'
+    END
+ELSE
+    PRINT 'Table applicationsHomeSubCluster exists, no operation required...'
+    GO
+GO
+
+IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables
+    WHERE name = 'membership'
+    AND schema_id = SCHEMA_ID('dbo'))
+    BEGIN
+        PRINT 'Table membership does not exist, create it...'
+
+        SET ANSI_NULLS ON
+
+        SET QUOTED_IDENTIFIER ON
+
+        SET ANSI_PADDING ON
+
+        CREATE TABLE [dbo].[membership](
+            [subClusterId]            VARCHAR(256) COLLATE Latin1_General_100_BIN2 NOT NULL,
+            [amRMServiceAddress]      VARCHAR(256) NOT NULL,
+            [clientRMServiceAddress]  VARCHAR(256) NOT NULL,
+            [rmAdminServiceAddress]   VARCHAR(256) NOT NULL,
+            [rmWebServiceAddress]     VARCHAR(256) NOT NULL,
+            [lastHeartBeat]           DATETIME2 NOT NULL,
+            [state]                   VARCHAR(32) NOT NULL,
+            [lastStartTime]           BIGINT NOT NULL,
+            [capability]              VARCHAR(6000) NOT NULL,
+
+            CONSTRAINT [pk_subClusterId] PRIMARY KEY
+            (
+                [subClusterId]
+            )
+        )
+
+        SET ANSI_PADDING OFF
+
+        PRINT 'Table membership created.'
+    END
+ELSE
+    PRINT 'Table membership exists, no operation required...'
+    GO
+GO
+
+IF NOT EXISTS ( SELECT * FROM [FederationStateStore].sys.tables
+    WHERE name = 'policies'
+    AND schema_id = SCHEMA_ID('dbo'))
+    BEGIN
+        PRINT 'Table policies does not exist, create it...'
+
+        SET ANSI_NULLS ON
+
+        SET QUOTED_IDENTIFIER ON
+
+        SET ANSI_PADDING ON
+
+        CREATE TABLE [dbo].[policies](
+            queue       VARCHAR(256) COLLATE Latin1_General_100_BIN2 NOT NULL,
+            policyType  VARCHAR(256) NOT NULL,
+            params      VARBINARY(6000) NOT NULL,
+
+            CONSTRAINT [pk_queue] PRIMARY KEY
+            (
+                [queue]
+            )
+        )
+
+        SET ANSI_PADDING OFF
+
+        PRINT 'Table policies created.'
+    END
+ELSE
+    PRINT 'Table policies exists, no operation required...'
+    GO
+GO