Bladeren bron

YARN-8898. Fix FederationInterceptor#allocate to set application priority in allocateResponse. (#5645)

slfan1989 2 jaren geleden
bovenliggende
commit
bba663038d

+ 20 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java

@@ -989,6 +989,26 @@ public final class FederationStateStoreFacade {
     }
   }
 
+  /**
+   * Get ApplicationSubmissionContext according to ApplicationId.
+   * We don't throw exceptions. If the application cannot be found, we return null.
+   *
+   * @param appId ApplicationId
+   * @return ApplicationSubmissionContext of ApplicationId
+   */
+  public ApplicationSubmissionContext getApplicationSubmissionContext(ApplicationId appId) {
+    try {
+      GetApplicationHomeSubClusterResponse response = stateStore.getApplicationHomeSubCluster(
+          GetApplicationHomeSubClusterRequest.newInstance(appId));
+      ApplicationHomeSubCluster appHomeSubCluster = response.getApplicationHomeSubCluster();
+      return appHomeSubCluster.getApplicationSubmissionContext();
+    } catch (Exception e) {
+      LOG.error("getApplicationSubmissionContext error, applicationId = {}.", appId, e);
+      return null;
+    }
+  }
+
+
   @VisibleForTesting
   public FederationCache getFederationCache() {
     return federationCache;

+ 17 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java

@@ -125,6 +125,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
    * @param keepContainersAcrossApplicationAttempts keep container flag for UAM
    *          recovery.
    * @param rmName name of the YarnRM
+   * @param originalAppSubmissionContext ApplicationSubmissionContext
    * @see ApplicationSubmissionContext
    *          #setKeepContainersAcrossApplicationAttempts(boolean)
    * @return uamId for the UAM
@@ -134,7 +135,8 @@ public class UnmanagedAMPoolManager extends AbstractService {
   public String createAndRegisterNewUAM(
       RegisterApplicationMasterRequest registerRequest, Configuration conf,
       String queueName, String submitter, String appNameSuffix,
-      boolean keepContainersAcrossApplicationAttempts, String rmName)
+      boolean keepContainersAcrossApplicationAttempts, String rmName,
+      ApplicationSubmissionContext originalAppSubmissionContext)
       throws YarnException, IOException {
     ApplicationId appId = null;
     ApplicationClientProtocol rmClient;
@@ -158,7 +160,8 @@ public class UnmanagedAMPoolManager extends AbstractService {
 
     // Launch the UAM in RM
     launchUAM(appId.toString(), conf, appId, queueName, submitter,
-        appNameSuffix, keepContainersAcrossApplicationAttempts, rmName);
+        appNameSuffix, keepContainersAcrossApplicationAttempts, rmName,
+        originalAppSubmissionContext);
 
     // Register the UAM application
     registerApplicationMaster(appId.toString(), registerRequest);
@@ -179,6 +182,7 @@ public class UnmanagedAMPoolManager extends AbstractService {
    * @param keepContainersAcrossApplicationAttempts keep container flag for UAM
    *          recovery.
    * @param rmName name of the YarnRM
+   * @param originalAppSubmissionContext AppSubmissionContext
    * @see ApplicationSubmissionContext
    *          #setKeepContainersAcrossApplicationAttempts(boolean)
    * @return UAM token
@@ -188,14 +192,15 @@ public class UnmanagedAMPoolManager extends AbstractService {
   public Token<AMRMTokenIdentifier> launchUAM(String uamId, Configuration conf,
       ApplicationId appId, String queueName, String submitter,
       String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
-      String rmName) throws YarnException, IOException {
+      String rmName, ApplicationSubmissionContext originalAppSubmissionContext)
+      throws YarnException, IOException {
 
     if (this.unmanagedAppMasterMap.containsKey(uamId)) {
       throw new YarnException("UAM " + uamId + " already exists");
     }
     UnmanagedApplicationManager uam = createUAM(conf, appId, queueName,
         submitter, appNameSuffix, keepContainersAcrossApplicationAttempts,
-        rmName);
+        rmName, originalAppSubmissionContext);
     // Put the UAM into map first before initializing it to avoid additional UAM
     // for the same uamId being created concurrently
     this.unmanagedAppMasterMap.put(uamId, uam);
@@ -226,19 +231,21 @@ public class UnmanagedAMPoolManager extends AbstractService {
    * @param appNameSuffix application name suffix for the UAM
    * @param uamToken UAM token
    * @param rmName name of the YarnRM
+   * @param originalAppSubmissionContext AppSubmissionContext
    * @throws YarnException if fails
    * @throws IOException if fails
    */
   public void reAttachUAM(String uamId, Configuration conf, ApplicationId appId,
       String queueName, String submitter, String appNameSuffix,
-      Token<AMRMTokenIdentifier> uamToken, String rmName)
+      Token<AMRMTokenIdentifier> uamToken, String rmName,
+      ApplicationSubmissionContext originalAppSubmissionContext)
       throws YarnException, IOException {
 
     if (this.unmanagedAppMasterMap.containsKey(uamId)) {
       throw new YarnException("UAM " + uamId + " already exists");
     }
     UnmanagedApplicationManager uam = createUAM(conf, appId, queueName,
-        submitter, appNameSuffix, true, rmName);
+        submitter, appNameSuffix, true, rmName, originalAppSubmissionContext);
     // Put the UAM into map first before initializing it to avoid additional UAM
     // for the same uamId being created concurrently
     this.unmanagedAppMasterMap.put(uamId, uam);
@@ -266,15 +273,17 @@ public class UnmanagedAMPoolManager extends AbstractService {
    * @param appNameSuffix application name suffix
    * @param keepContainersAcrossApplicationAttempts keep container flag for UAM
    * @param rmName name of the YarnRM
+   * @param originalAppSubmissionContext ApplicationSubmissionContext
    * @return the UAM instance
    */
   @VisibleForTesting
   protected UnmanagedApplicationManager createUAM(Configuration conf,
       ApplicationId appId, String queueName, String submitter,
       String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
-      String rmName) {
+      String rmName, ApplicationSubmissionContext originalAppSubmissionContext) {
     return new UnmanagedApplicationManager(conf, appId, queueName, submitter,
-        appNameSuffix, keepContainersAcrossApplicationAttempts, rmName);
+        appNameSuffix, keepContainersAcrossApplicationAttempts, rmName,
+        originalAppSubmissionContext);
   }
 
   /**

+ 22 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java

@@ -99,6 +99,7 @@ public class UnmanagedApplicationManager {
   private long asyncApiPollIntervalMillis;
   private RecordFactory recordFactory;
   private boolean keepContainersAcrossApplicationAttempts;
+  private ApplicationSubmissionContext applicationSubmissionContext;
 
   /*
    * This flag is used as an indication that this method launchUAM/reAttachUAM
@@ -117,13 +118,15 @@ public class UnmanagedApplicationManager {
    * @param submitter user name of the app
    * @param appNameSuffix the app name suffix to use
    * @param rmName name of the YarnRM
+   * @param originalApplicationSubmissionContext ApplicationSubmissionContext
    * @param keepContainersAcrossApplicationAttempts keep container flag for UAM
    *          recovery. See {@link ApplicationSubmissionContext
    *          #setKeepContainersAcrossApplicationAttempts(boolean)}
    */
   public UnmanagedApplicationManager(Configuration conf, ApplicationId appId,
       String queueName, String submitter, String appNameSuffix,
-      boolean keepContainersAcrossApplicationAttempts, String rmName) {
+      boolean keepContainersAcrossApplicationAttempts, String rmName,
+      ApplicationSubmissionContext originalApplicationSubmissionContext) {
     Preconditions.checkNotNull(conf, "Configuration cannot be null");
     Preconditions.checkNotNull(appId, "ApplicationId cannot be null");
     Preconditions.checkNotNull(submitter, "App submitter cannot be null");
@@ -150,6 +153,7 @@ public class UnmanagedApplicationManager {
             DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
     this.keepContainersAcrossApplicationAttempts =
         keepContainersAcrossApplicationAttempts;
+    this.applicationSubmissionContext = originalApplicationSubmissionContext;
   }
 
   @VisibleForTesting
@@ -429,6 +433,18 @@ public class UnmanagedApplicationManager {
     Resource resource = Resources.createResource(1024);
     context.setResource(resource);
     context.setAMContainerSpec(amContainer);
+    if (applicationSubmissionContext != null) {
+      context.setApplicationType(applicationSubmissionContext.getApplicationType());
+      context.setKeepContainersAcrossApplicationAttempts(
+          applicationSubmissionContext.getKeepContainersAcrossApplicationAttempts());
+      context.setApplicationTags(applicationSubmissionContext.getApplicationTags());
+      context.setApplicationTimeouts(applicationSubmissionContext.getApplicationTimeouts());
+      context.setLogAggregationContext(applicationSubmissionContext.getLogAggregationContext());
+      context.setNodeLabelExpression(applicationSubmissionContext.getNodeLabelExpression());
+      context.setApplicationSchedulingPropertiesMap(
+          applicationSubmissionContext.getApplicationSchedulingPropertiesMap());
+      context.setPriority(applicationSubmissionContext.getPriority());
+    }
     submitRequest.setApplicationSubmissionContext(context);
 
     context.setUnmanagedAM(true);
@@ -551,4 +567,9 @@ public class UnmanagedApplicationManager {
   protected boolean isHeartbeatThreadAlive() {
     return this.heartbeatHandler.isAlive();
   }
+
+  @VisibleForTesting
+  public ApplicationSubmissionContext getApplicationSubmissionContext() {
+    return applicationSubmissionContext;
+  }
 }

+ 58 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java

@@ -20,9 +20,12 @@ package org.apache.hadoop.yarn.server.uam;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -36,8 +39,11 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@@ -78,7 +84,7 @@ public class TestUnmanagedApplicationManager {
 
     uam = new TestableUnmanagedApplicationManager(conf,
         attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true,
-        "rm");
+        "rm", null);
 
     threadpool = Executors.newCachedThreadPool();
     uamPool = new TestableUnmanagedAMPoolManager(this.threadpool);
@@ -434,8 +440,16 @@ public class TestUnmanagedApplicationManager {
         ApplicationId appId, String queueName, String submitter,
         String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
         String rmName) {
+      this(conf, appId, queueName, submitter, appNameSuffix,
+          keepContainersAcrossApplicationAttempts, rmName, null);
+    }
+
+    public TestableUnmanagedApplicationManager(Configuration conf,
+        ApplicationId appId, String queueName, String submitter,
+        String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
+        String rmName, ApplicationSubmissionContext originalApplicationSubmissionContext) {
       super(conf, appId, queueName, submitter, appNameSuffix,
-          keepContainersAcrossApplicationAttempts, rmName);
+          keepContainersAcrossApplicationAttempts, rmName, originalApplicationSubmissionContext);
     }
 
     @Override
@@ -505,9 +519,11 @@ public class TestUnmanagedApplicationManager {
     @Override
     public UnmanagedApplicationManager createUAM(Configuration configuration,
         ApplicationId appId, String queueName, String submitter, String appNameSuffix,
-        boolean keepContainersAcrossApplicationAttempts, String rmId) {
+        boolean keepContainersAcrossApplicationAttempts, String rmId,
+        ApplicationSubmissionContext originalAppSubmissionContext) {
       return new TestableUnmanagedApplicationManager(configuration, appId, queueName, submitter,
-          appNameSuffix, keepContainersAcrossApplicationAttempts, rmId);
+          appNameSuffix, keepContainersAcrossApplicationAttempts, rmId,
+          originalAppSubmissionContext);
     }
   }
 
@@ -516,13 +532,13 @@ public class TestUnmanagedApplicationManager {
     ApplicationAttemptId attemptId1 =
         ApplicationAttemptId.newInstance(ApplicationId.newInstance(Time.now(), 1), 1);
     Token<AMRMTokenIdentifier> token1 = uamPool.launchUAM("SC-1", this.conf,
-        attemptId1.getApplicationId(), "default", "test-user", "SC-HOME", true, "SC-1");
+        attemptId1.getApplicationId(), "default", "test-user", "SC-HOME", true, "SC-1", null);
     Assert.assertNotNull(token1);
 
     ApplicationAttemptId attemptId2 =
         ApplicationAttemptId.newInstance(ApplicationId.newInstance(Time.now(), 2), 1);
     Token<AMRMTokenIdentifier> token2 = uamPool.launchUAM("SC-2", this.conf,
-        attemptId2.getApplicationId(), "default", "test-user", "SC-HOME", true, "SC-2");
+        attemptId2.getApplicationId(), "default", "test-user", "SC-HOME", true, "SC-2", null);
     Assert.assertNotNull(token2);
 
     Map<String, UnmanagedApplicationManager> unmanagedAppMasterMap =
@@ -542,4 +558,40 @@ public class TestUnmanagedApplicationManager {
         100, 2000);
     Assert.assertEquals(0, unmanagedAppMasterMap.size());
   }
+
+  @Test
+  public void testApplicationAttributes()
+      throws IOException, YarnException, InterruptedException, TimeoutException {
+    long now = Time.now();
+    ApplicationId applicationId = ApplicationId.newInstance(now, 10);
+    ApplicationSubmissionContext appSubmissionContext = ApplicationSubmissionContext.newInstance(
+        applicationId, "test", "default", Priority.newInstance(10), null, true, true, 2,
+        Resource.newInstance(10, 2), "test");
+    Set<String> tags = Collections.singleton("1");
+    appSubmissionContext.setApplicationTags(tags);
+
+    Token<AMRMTokenIdentifier> token1 = uamPool.launchUAM("SC-1", this.conf,
+        applicationId, "default", "test-user", "SC-HOME", true, "SC-1", appSubmissionContext);
+    Assert.assertNotNull(token1);
+
+    Map<String, UnmanagedApplicationManager> unmanagedAppMasterMap =
+        uamPool.getUnmanagedAppMasterMap();
+
+    UnmanagedApplicationManager uamApplicationManager = unmanagedAppMasterMap.get("SC-1");
+    Assert.assertNotNull(uamApplicationManager);
+
+    ApplicationSubmissionContext appSubmissionContextByUam =
+        uamApplicationManager.getApplicationSubmissionContext();
+
+    Assert.assertNotNull(appSubmissionContext);
+    Assert.assertEquals(10, appSubmissionContextByUam.getPriority().getPriority());
+    Assert.assertEquals("test", appSubmissionContextByUam.getApplicationType());
+    Assert.assertEquals(1, appSubmissionContextByUam.getApplicationTags().size());
+
+    uamPool.stop();
+    Thread finishApplicationThread = uamPool.getFinishApplicationThread();
+    GenericTestUtils.waitFor(() -> !finishApplicationThread.isAlive(),
+        100, 2000);
+    Assert.assertEquals(0, unmanagedAppMasterMap.size());
+  }
 }

+ 15 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java

@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMas
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerReport;
@@ -411,9 +412,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
         FederationProxyProviderUtil.updateConfForFederation(config, keyScId);
 
         try {
+          ApplicationSubmissionContext originalSubmissionContext =
+              federationFacade.getApplicationSubmissionContext(applicationId);
+
           // ReAttachUAM
           this.uamPool.reAttachUAM(keyScId, config, applicationId, queue, user, homeSCId,
-              tokens, keyScId);
+              tokens, keyScId, originalSubmissionContext);
 
           // GetAMRMClientRelayer
           this.secondaryRelayers.put(keyScId, this.uamPool.getAMRMClientRelayer(keyScId));
@@ -1026,10 +1030,13 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
                 FederationProxyProviderUtil.updateConfForFederation(config,
                     subClusterId.getId());
 
+                ApplicationSubmissionContext originalSubmissionContext =
+                    federationFacade.getApplicationSubmissionContext(appId);
+
                 uamPool.reAttachUAM(subClusterId.getId(), config, appId,
                     amRegistrationResponse.getQueue(),
                     getApplicationContext().getUser(), homeSubClusterId.getId(),
-                    amrmToken, subClusterId.toString());
+                    amrmToken, subClusterId.toString(), originalSubmissionContext);
 
                 secondaryRelayers.put(subClusterId.getId(),
                     uamPool.getAMRMClientRelayer(subClusterId.getId()));
@@ -1266,11 +1273,15 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
           RegisterApplicationMasterResponse uamResponse = null;
           Token<AMRMTokenIdentifier> token = null;
           try {
+            ApplicationId applicationId = attemptId.getApplicationId();
+            ApplicationSubmissionContext originalSubmissionContext =
+                federationFacade.getApplicationSubmissionContext(applicationId);
+
             // For appNameSuffix, use subClusterId of the home sub-cluster
             token = uamPool.launchUAM(subClusterId, config,
-                attemptId.getApplicationId(), amRegistrationResponse.getQueue(),
+                applicationId, amRegistrationResponse.getQueue(),
                 getApplicationContext().getUser(), homeSubClusterId.toString(),
-                true, subClusterId);
+                true, subClusterId, originalSubmissionContext);
 
             secondaryRelayers.put(subClusterId,
                 uamPool.getAMRMClientRelayer(subClusterId));

+ 5 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@@ -214,10 +215,10 @@ public class TestableFederationInterceptor extends FederationInterceptor {
     public UnmanagedApplicationManager createUAM(Configuration conf,
         ApplicationId appId, String queueName, String submitter,
         String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
-        String rmId) {
+        String rmId, ApplicationSubmissionContext originalAppSubmissionContext) {
       return new TestableUnmanagedApplicationManager(conf, appId, queueName,
           submitter, appNameSuffix, keepContainersAcrossApplicationAttempts,
-          rmId);
+          rmId, originalAppSubmissionContext);
     }
   }
 
@@ -231,9 +232,9 @@ public class TestableFederationInterceptor extends FederationInterceptor {
     public TestableUnmanagedApplicationManager(Configuration conf,
         ApplicationId appId, String queueName, String submitter,
         String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
-        String rmName) {
+        String rmName, ApplicationSubmissionContext originalAppSubmissionContext) {
       super(conf, appId, queueName, submitter, appNameSuffix,
-          keepContainersAcrossApplicationAttempts, rmName);
+          keepContainersAcrossApplicationAttempts, rmName, originalAppSubmissionContext);
     }
 
     @Override