瀏覽代碼

YARN-11479. [Federation] ZookeeperFederationStateStore Support Store ApplicationSubmitData. (#5631)

slfan1989 2 年之前
父節點
當前提交
690db3c34b

+ 105 - 48
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java

@@ -37,11 +37,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.curator.ZKCuratorManager;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterIdProto;
 import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterInfoProto;
 import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterPolicyConfigurationProto;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.ApplicationHomeSubClusterProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
 import org.apache.hadoop.yarn.security.client.YARNDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
@@ -94,6 +96,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.RouterRMTokenReque
 import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterIdPBImpl;
 import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterInfoPBImpl;
 import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterPolicyConfigurationPBImpl;
+import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.ApplicationHomeSubClusterPBImpl;
 import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
 import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
 import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
@@ -323,49 +326,76 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
 
     long start = clock.getTime();
     FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
-    ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster();
-    ApplicationId appId = app.getApplicationId();
+
+    // parse parameters
+    // We need to get applicationId, subClusterId, appSubmissionContext 3 parameters.
+    ApplicationHomeSubCluster requestApplicationHomeSubCluster =
+        request.getApplicationHomeSubCluster();
+    ApplicationId requestApplicationId = requestApplicationHomeSubCluster.getApplicationId();
+    SubClusterId requestSubClusterId = requestApplicationHomeSubCluster.getHomeSubCluster();
+    ApplicationSubmissionContext requestApplicationSubmissionContext =
+         requestApplicationHomeSubCluster.getApplicationSubmissionContext();
+
+    LOG.debug("applicationId = {}, subClusterId = {}, appSubmissionContext = {}.",
+        requestApplicationId, requestSubClusterId, requestApplicationSubmissionContext);
 
     // Try to write the subcluster
-    SubClusterId homeSubCluster = app.getHomeSubCluster();
     try {
-      putApp(appId, homeSubCluster, false);
+      storeOrUpdateApplicationHomeSubCluster(requestApplicationId,
+          requestApplicationHomeSubCluster, false);
     } catch (Exception e) {
-      String errMsg = "Cannot add application home subcluster for " + appId;
+      String errMsg = "Cannot add application home subcluster for " + requestApplicationId;
       FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
     }
 
     // Check for the actual subcluster
     try {
-      homeSubCluster = getApp(appId);
+      // We try to get the ApplicationHomeSubCluster actually stored in ZK
+      // according to the applicationId.
+      ApplicationHomeSubCluster actualAppHomeSubCluster =
+          getApplicationHomeSubCluster(requestApplicationId);
+      SubClusterId responseSubClusterId = actualAppHomeSubCluster.getHomeSubCluster();
+      long end = clock.getTime();
+      opDurations.addAppHomeSubClusterDuration(start, end);
+      return AddApplicationHomeSubClusterResponse.newInstance(responseSubClusterId);
     } catch (Exception e) {
-      String errMsg = "Cannot check app home subcluster for " + appId;
+      String errMsg = "Cannot check app home subcluster for " + requestApplicationId;
       FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
     }
-    long end = clock.getTime();
-    opDurations.addAppHomeSubClusterDuration(start, end);
-    return AddApplicationHomeSubClusterResponse
-        .newInstance(homeSubCluster);
+
+    // Throw YarnException.
+    throw new YarnException("Cannot addApplicationHomeSubCluster by request");
   }
 
   @Override
-  public UpdateApplicationHomeSubClusterResponse
-      updateApplicationHomeSubCluster(
-          UpdateApplicationHomeSubClusterRequest request)
-              throws YarnException {
+  public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster(
+      UpdateApplicationHomeSubClusterRequest request) throws YarnException {
 
     long start = clock.getTime();
     FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
-    ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster();
-    ApplicationId appId = app.getApplicationId();
-    SubClusterId homeSubCluster = getApp(appId);
-    if (homeSubCluster == null) {
-      String errMsg = "Application " + appId + " does not exist";
+    ApplicationHomeSubCluster requestApplicationHomeSubCluster =
+        request.getApplicationHomeSubCluster();
+    ApplicationId requestApplicationId = requestApplicationHomeSubCluster.getApplicationId();
+    ApplicationHomeSubCluster zkStoreApplicationHomeSubCluster =
+        getApplicationHomeSubCluster(requestApplicationId);
+
+    if (zkStoreApplicationHomeSubCluster == null) {
+      String errMsg = "Application " + requestApplicationId + " does not exist";
       FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
     }
-    SubClusterId newSubClusterId =
-        request.getApplicationHomeSubCluster().getHomeSubCluster();
-    putApp(appId, newSubClusterId, true);
+
+    SubClusterId oldSubClusterId = zkStoreApplicationHomeSubCluster.getHomeSubCluster();
+    SubClusterId newSubClusterId = requestApplicationHomeSubCluster.getHomeSubCluster();
+    ApplicationSubmissionContext requestApplicationSubmissionContext =
+        requestApplicationHomeSubCluster.getApplicationSubmissionContext();
+
+    LOG.debug("applicationId = {}, oldHomeSubClusterId = {}, newHomeSubClusterId = {}, " +
+        "appSubmissionContext = {}.", requestApplicationId, oldSubClusterId, newSubClusterId,
+        requestApplicationSubmissionContext);
+
+    // update stored ApplicationHomeSubCluster
+    storeOrUpdateApplicationHomeSubCluster(requestApplicationId,
+        requestApplicationHomeSubCluster, true);
 
     long end = clock.getTime();
     opDurations.addUpdateAppHomeSubClusterDuration(start, end);
@@ -378,15 +408,33 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
 
     long start = clock.getTime();
     FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
-    ApplicationId appId = request.getApplicationId();
-    SubClusterId homeSubCluster = getApp(appId);
-    if (homeSubCluster == null) {
-      String errMsg = "Application " + appId + " does not exist";
+    ApplicationId requestApplicationId = request.getApplicationId();
+
+    ApplicationHomeSubCluster zkStoreApplicationHomeSubCluster =
+        getApplicationHomeSubCluster(requestApplicationId);
+    if (zkStoreApplicationHomeSubCluster == null) {
+      String errMsg = "Application " + requestApplicationId + " does not exist";
       FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
     }
+
+    // Prepare to return data
+    SubClusterId subClusterId = zkStoreApplicationHomeSubCluster.getHomeSubCluster();
+    long createTime = zkStoreApplicationHomeSubCluster.getCreateTime();
+
     long end = clock.getTime();
     opDurations.addGetAppHomeSubClusterDuration(start, end);
-    return GetApplicationHomeSubClusterResponse.newInstance(appId, homeSubCluster);
+
+    // If the request asks for an ApplicationSubmissionContext to be returned,
+    // we will return
+    if (request.getContainsAppSubmissionContext()) {
+      ApplicationSubmissionContext submissionContext =
+          zkStoreApplicationHomeSubCluster.getApplicationSubmissionContext();
+      return GetApplicationHomeSubClusterResponse.newInstance(
+          requestApplicationId, subClusterId, createTime, submissionContext);
+    }
+
+    return GetApplicationHomeSubClusterResponse.newInstance(requestApplicationId,
+        subClusterId, createTime);
   }
 
   @Override
@@ -421,13 +469,18 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
 
   private ApplicationHomeSubCluster generateAppHomeSC(String appId) {
     try {
+      // Parse ApplicationHomeSubCluster
       ApplicationId applicationId = ApplicationId.fromString(appId);
-      SubClusterId homeSubCluster = getApp(applicationId);
-      ApplicationHomeSubCluster app =
-          ApplicationHomeSubCluster.newInstance(applicationId, homeSubCluster);
-      return app;
+      ApplicationHomeSubCluster zkStoreApplicationHomeSubCluster =
+          getApplicationHomeSubCluster(applicationId);
+
+      // Prepare to return data
+      SubClusterId subClusterId = zkStoreApplicationHomeSubCluster.getHomeSubCluster();
+      ApplicationHomeSubCluster resultApplicationHomeSubCluster =
+          ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
+      return resultApplicationHomeSubCluster;
     } catch (Exception ex) {
-      LOG.error("get homeSubCluster by appId = {}.", appId);
+      LOG.error("get homeSubCluster by appId = {}.", appId, ex);
     }
     return null;
   }
@@ -674,39 +727,43 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
 
   /**
    * Get the subcluster for an application.
+   *
    * @param appId Application identifier.
-   * @return Subcluster identifier.
+   * @return ApplicationHomeSubCluster identifier.
    * @throws Exception If it cannot contact ZooKeeper.
    */
-  private SubClusterId getApp(final ApplicationId appId) throws YarnException {
+  private ApplicationHomeSubCluster getApplicationHomeSubCluster(
+      final ApplicationId appId) throws YarnException {
     String appZNode = getNodePath(appsZNode, appId.toString());
 
-    SubClusterId subClusterId = null;
+    ApplicationHomeSubCluster appHomeSubCluster = null;
     byte[] data = get(appZNode);
     if (data != null) {
       try {
-        subClusterId = new SubClusterIdPBImpl(
-            SubClusterIdProto.parseFrom(data));
+        appHomeSubCluster = new ApplicationHomeSubClusterPBImpl(
+            ApplicationHomeSubClusterProto.parseFrom(data));
       } catch (InvalidProtocolBufferException e) {
         String errMsg = "Cannot parse application at " + appZNode;
         FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
       }
     }
-    return subClusterId;
+    return appHomeSubCluster;
   }
 
   /**
-   * Put an application.
-   * @param appId Application identifier.
-   * @param subClusterId Subcluster identifier.
+   * We will store the data of ApplicationHomeSubCluster according to appId.
+   *
+   * @param applicationId ApplicationId.
+   * @param applicationHomeSubCluster ApplicationHomeSubCluster.
+   * @param update false, add records; true, update records.
    * @throws Exception If it cannot contact ZooKeeper.
    */
-  private void putApp(final ApplicationId appId,
-      final SubClusterId subClusterId, boolean update)
-          throws YarnException {
-    String appZNode = getNodePath(appsZNode, appId.toString());
-    SubClusterIdProto proto =
-        ((SubClusterIdPBImpl)subClusterId).getProto();
+  private void storeOrUpdateApplicationHomeSubCluster(final ApplicationId applicationId,
+      final ApplicationHomeSubCluster applicationHomeSubCluster, boolean update)
+      throws YarnException {
+    String appZNode = getNodePath(appsZNode, applicationId.toString());
+    ApplicationHomeSubClusterProto proto =
+        ((ApplicationHomeSubClusterPBImpl) applicationHomeSubCluster).getProto();
     byte[] data = proto.toByteArray();
     put(appZNode, data, update);
   }

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java

@@ -406,7 +406,7 @@ public abstract class FederationStateStoreBaseTest {
     ApplicationId appId1 = ApplicationId.newInstance(1, 1);
     SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
     ApplicationHomeSubCluster ahsc1 =
-        ApplicationHomeSubCluster.newInstance(appId1, subClusterId1);
+        ApplicationHomeSubCluster.newInstance(appId1,  subClusterId1);
 
     ApplicationId appId2 = ApplicationId.newInstance(1, 2);
     SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
@@ -470,6 +470,7 @@ public abstract class FederationStateStoreBaseTest {
     Assert.assertEquals(10, items.size());
 
     for (ApplicationHomeSubCluster item : items) {
+      appHomeSubClusters.contains(item);
       Assert.assertTrue(appHomeSubClusters.contains(item));
     }
   }

+ 33 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java

@@ -33,6 +33,10 @@ import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
 import org.apache.hadoop.metrics2.impl.MetricsRecords;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
@@ -41,6 +45,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
 import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
 import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.After;
 import org.junit.Before;
@@ -276,4 +284,29 @@ public class TestZookeeperFederationStateStore extends FederationStateStoreBaseT
     assertNotNull(zkRouterStoreToken);
     assertEquals(token, zkRouterStoreToken);
   }
+
+  @Test
+  public void testGetApplicationHomeSubClusterWithContext() throws Exception {
+    ZookeeperFederationStateStore zkFederationStateStore =
+        (ZookeeperFederationStateStore) this.getStateStore();
+
+    ApplicationId appId = ApplicationId.newInstance(1, 3);
+    SubClusterId subClusterId = SubClusterId.newInstance("SC");
+    ApplicationSubmissionContext context =
+        ApplicationSubmissionContext.newInstance(appId, "test", "default",
+        Priority.newInstance(0), null, true, true,
+        2, Resource.newInstance(10, 2), "test");
+    addApplicationHomeSC(appId, subClusterId, context);
+
+    GetApplicationHomeSubClusterRequest getRequest =
+        GetApplicationHomeSubClusterRequest.newInstance(appId, true);
+    GetApplicationHomeSubClusterResponse result =
+        zkFederationStateStore.getApplicationHomeSubCluster(getRequest);
+
+    ApplicationHomeSubCluster applicationHomeSubCluster = result.getApplicationHomeSubCluster();
+
+    assertEquals(appId, applicationHomeSubCluster.getApplicationId());
+    assertEquals(subClusterId, applicationHomeSubCluster.getHomeSubCluster());
+    assertEquals(context, applicationHomeSubCluster.getApplicationSubmissionContext());
+  }
 }