Browse Source

YARN-11477. [Federation] MemoryFederationStateStore Support Store ApplicationSubmitData. (#5616)

slfan1989 2 years ago
parent
commit
cda9863d54
9 changed files with 152 additions and 7 deletions
  1. 20 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
  2. 11 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/ApplicationHomeSubCluster.java
  3. 26 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationHomeSubClusterResponse.java
  4. 7 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
  5. 11 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
  6. 34 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
  7. 37 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
  8. 3 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
  9. 3 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java

+ 20 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java

@@ -34,6 +34,7 @@ import java.util.Comparator;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
@@ -252,9 +253,13 @@ public class MemoryFederationStateStore implements FederationStateStore {
 
     FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
     ApplicationHomeSubCluster homeSubCluster = request.getApplicationHomeSubCluster();
-
+    SubClusterId homeSubClusterId = homeSubCluster.getHomeSubCluster();
+    ApplicationSubmissionContext appSubmissionContext = homeSubCluster.getApplicationSubmissionContext();
     ApplicationId appId = homeSubCluster.getApplicationId();
 
+    LOG.info("appId = {}, homeSubClusterId = {}, appSubmissionContext = {}.",
+        appId, homeSubClusterId, appSubmissionContext);
+
     if (!applications.containsKey(appId)) {
       applications.put(appId, homeSubCluster);
     }
@@ -292,8 +297,20 @@ public class MemoryFederationStateStore implements FederationStateStore {
           "Application %s does not exist.", appId);
     }
 
-    return GetApplicationHomeSubClusterResponse.newInstance(appId,
-        applications.get(appId).getHomeSubCluster());
+    // Whether the returned result contains context
+    ApplicationHomeSubCluster appHomeSubCluster = applications.get(appId);
+    ApplicationSubmissionContext submissionContext =
+        appHomeSubCluster.getApplicationSubmissionContext();
+    boolean containsAppSubmissionContext = request.getContainsAppSubmissionContext();
+    long creatTime = appHomeSubCluster.getCreateTime();
+    SubClusterId homeSubClusterId = appHomeSubCluster.getHomeSubCluster();
+
+    if (containsAppSubmissionContext && submissionContext != null) {
+      return GetApplicationHomeSubClusterResponse.newInstance(appId, homeSubClusterId, creatTime,
+          submissionContext);
+    }
+
+    return GetApplicationHomeSubClusterResponse.newInstance(appId, homeSubClusterId, creatTime);
   }
 
   @Override

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/ApplicationHomeSubCluster.java

@@ -77,6 +77,17 @@ public abstract class ApplicationHomeSubCluster {
     return appMapping;
   }
 
+  @Private
+  @Unstable
+  public static ApplicationHomeSubCluster newInstance(ApplicationId appId,
+      SubClusterId homeSubCluster, ApplicationSubmissionContext appSubmissionContext) {
+    ApplicationHomeSubCluster appMapping = Records.newRecord(ApplicationHomeSubCluster.class);
+    appMapping.setApplicationId(appId);
+    appMapping.setHomeSubCluster(homeSubCluster);
+    appMapping.setApplicationSubmissionContext(appSubmissionContext);
+    return appMapping;
+  }
+
   /**
    * Get the {@link ApplicationId} representing the unique identifier of the
    * application.

+ 26 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetApplicationHomeSubClusterResponse.java

@@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.util.Records;
 
 /**
@@ -52,6 +53,31 @@ public abstract class GetApplicationHomeSubClusterResponse {
     return mapResponse;
   }
 
+  @Private
+  @Unstable
+  public static GetApplicationHomeSubClusterResponse newInstance(
+      ApplicationId appId, SubClusterId homeSubCluster, long createTime) {
+    ApplicationHomeSubCluster applicationHomeSubCluster =
+        ApplicationHomeSubCluster.newInstance(appId, createTime, homeSubCluster);
+    GetApplicationHomeSubClusterResponse mapResponse =
+        Records.newRecord(GetApplicationHomeSubClusterResponse.class);
+    mapResponse.setApplicationHomeSubCluster(applicationHomeSubCluster);
+    return mapResponse;
+  }
+
+  @Private
+  @Unstable
+  public static GetApplicationHomeSubClusterResponse newInstance(
+      ApplicationId appId, SubClusterId homeSubCluster, long createTime,
+      ApplicationSubmissionContext context) {
+    ApplicationHomeSubCluster applicationHomeSubCluster =
+        ApplicationHomeSubCluster.newInstance(appId, createTime, homeSubCluster, context);
+    GetApplicationHomeSubClusterResponse mapResponse =
+        Records.newRecord(GetApplicationHomeSubClusterResponse.class);
+    mapResponse.setApplicationHomeSubCluster(applicationHomeSubCluster);
+    return mapResponse;
+  }
+
   /**
    * Get the {@link ApplicationHomeSubCluster} representing the mapping of the
    * application to it's home sub-cluster.

+ 7 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java

@@ -38,7 +38,9 @@ import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -837,13 +839,16 @@ public final class FederationStateStoreFacade {
    * @param applicationId applicationId, is the id of the application.
    * @param subClusterId homeSubClusterId, this is selected by strategy.
    * @param retryCount number of retries.
+   * @param appSubmissionContext appSubmissionContext.
    * @throws YarnException yarn exception.
    */
   public void addOrUpdateApplicationHomeSubCluster(ApplicationId applicationId,
-      SubClusterId subClusterId, int retryCount) throws YarnException {
+      SubClusterId subClusterId, int retryCount, ApplicationSubmissionContext appSubmissionContext)
+      throws YarnException {
     Boolean exists = existsApplicationHomeSubCluster(applicationId);
     ApplicationHomeSubCluster appHomeSubCluster =
-        ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
+        ApplicationHomeSubCluster.newInstance(applicationId, Time.now(),
+        subClusterId, appSubmissionContext);
     if (!exists || retryCount == 0) {
       // persist the mapping of applicationId and the subClusterId which has
       // been selected as its home.

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
@@ -656,6 +657,16 @@ public abstract class FederationStateStoreBaseTest {
     stateStore.addApplicationHomeSubCluster(request);
   }
 
+  void addApplicationHomeSC(ApplicationId appId, SubClusterId subClusterId,
+      ApplicationSubmissionContext submissionContext) throws YarnException {
+    long createTime = Time.now();
+    ApplicationHomeSubCluster ahsc = ApplicationHomeSubCluster.newInstance(
+        appId, createTime, subClusterId, submissionContext);
+    AddApplicationHomeSubClusterRequest request =
+         AddApplicationHomeSubClusterRequest.newInstance(ahsc);
+    stateStore.addApplicationHomeSubCluster(request);
+  }
+
   private void setPolicyConf(String queue, String policyType)
       throws YarnException {
     SetSubClusterPolicyConfigurationRequest request =

+ 34 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java

@@ -19,6 +19,10 @@ package org.apache.hadoop.yarn.server.federation.store.impl;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
@@ -27,6 +31,10 @@ import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
 import org.apache.hadoop.yarn.server.federation.store.records.RouterRMDTSecretManagerState;
 import org.apache.hadoop.yarn.server.federation.store.records.RouterStoreToken;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import org.junit.Test;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -88,4 +96,30 @@ public class TestMemoryFederationStateStore extends FederationStateStoreBaseTest
     assertTrue(tokenIdentifier instanceof RMDelegationTokenIdentifier);
     assertEquals(identifier, tokenIdentifier);
   }
+
+  @Test
+  public void testGetApplicationHomeSubClusterWithContext() throws Exception {
+    MemoryFederationStateStore memoryStateStore =
+        MemoryFederationStateStore.class.cast(this.getStateStore());
+
+    ApplicationId appId = ApplicationId.newInstance(1, 3);
+    SubClusterId subClusterId = SubClusterId.newInstance("SC");
+    ApplicationSubmissionContext context =
+        ApplicationSubmissionContext.newInstance(appId, "test", "default",
+        Priority.newInstance(0), null, true, true,
+        2, Resource.newInstance(10, 2), "test");
+    addApplicationHomeSC(appId, subClusterId, context);
+
+    GetApplicationHomeSubClusterRequest getRequest =
+        GetApplicationHomeSubClusterRequest.newInstance(appId, true);
+    GetApplicationHomeSubClusterResponse result =
+        memoryStateStore.getApplicationHomeSubCluster(getRequest);
+
+    assertEquals(appId,
+        result.getApplicationHomeSubCluster().getApplicationId());
+    assertEquals(subClusterId,
+        result.getApplicationHomeSubCluster().getHomeSubCluster());
+    assertEquals(context,
+        result.getApplicationHomeSubCluster().getApplicationSubmissionContext());
+  }
 }

+ 37 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java

@@ -47,10 +47,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationReque
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationRequestInfo;
 import org.apache.hadoop.yarn.api.records.ReservationDefinition;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.util.Records;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -764,4 +766,39 @@ public final class RouterServerUtil {
       return b.toByteArray();
     }
   }
+
+  /**
+   * Get trimmed version of ApplicationSubmissionContext to be saved to
+   * Federation State Store.
+   *
+   * @param actualContext actual ApplicationSubmissionContext.
+   * @return trimmed ApplicationSubmissionContext.
+   */
+  @Private
+  @Unstable
+  public static ApplicationSubmissionContext getTrimmedAppSubmissionContext(
+      ApplicationSubmissionContext actualContext) {
+    if (actualContext == null) {
+      return null;
+    }
+
+    // Set Basic information
+    ApplicationSubmissionContext trimmedContext =
+        Records.newRecord(ApplicationSubmissionContext.class);
+    trimmedContext.setApplicationId(actualContext.getApplicationId());
+    trimmedContext.setApplicationName(actualContext.getApplicationName());
+    trimmedContext.setQueue(actualContext.getQueue());
+    trimmedContext.setPriority(actualContext.getPriority());
+    trimmedContext.setApplicationType(actualContext.getApplicationType());
+    trimmedContext.setNodeLabelExpression(actualContext.getNodeLabelExpression());
+    trimmedContext.setLogAggregationContext(actualContext.getLogAggregationContext());
+    trimmedContext.setApplicationTags(actualContext.getApplicationTags());
+    trimmedContext.setApplicationSchedulingPropertiesMap(
+        actualContext.getApplicationSchedulingPropertiesMap());
+    trimmedContext.setKeepContainersAcrossApplicationAttempts(
+        actualContext.getKeepContainersAcrossApplicationAttempts());
+    trimmedContext.setApplicationTimeouts(actualContext.getApplicationTimeouts());
+
+    return trimmedContext;
+  }
 }

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java

@@ -565,8 +565,10 @@ public class FederationClientInterceptor
 
       // Step2. We Store the mapping relationship
       // between Application and HomeSubCluster in stateStore.
+      ApplicationSubmissionContext trimmedAppSubmissionContext =
+          RouterServerUtil.getTrimmedAppSubmissionContext(appSubmissionContext);
       federationFacade.addOrUpdateApplicationHomeSubCluster(
-          applicationId, subClusterId, retryCount);
+          applicationId, subClusterId, retryCount, trimmedAppSubmissionContext);
 
       // Step3. SubmitApplication to the subCluster
       ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java

@@ -559,8 +559,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
 
       // Step2. We Store the mapping relationship
       // between Application and HomeSubCluster in stateStore.
+      ApplicationSubmissionContext trimmedAppSubmissionContext =
+          RouterServerUtil.getTrimmedAppSubmissionContext(context);
       federationFacade.addOrUpdateApplicationHomeSubCluster(
-          applicationId, subClusterId, retryCount);
+          applicationId, subClusterId, retryCount, trimmedAppSubmissionContext);
 
       // Step3. We get subClusterInfo based on subClusterId.
       SubClusterInfo subClusterInfo = federationFacade.getSubCluster(subClusterId);