Explorar o código

YARN-10229. [Federation] Client should be able to submit application to RM directly using normal client conf. Contributed by Bilwa S T.

(cherry picked from commit eac558380fd7d3c2e78b8956e2080688bb1dd8bb)
Brahma Reddy Battula %!s(int64=4) %!d(string=hai) anos
pai
achega
643ff4881d

+ 32 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java

@@ -108,6 +108,8 @@ public class AMRMProxyService extends CompositeService implements
   private Map<ApplicationId, RequestInterceptorChainWrapper> applPipelineMap;
   private RegistryOperations registry;
   private AMRMProxyMetrics metrics;
+  private FederationStateStoreFacade federationFacade;
+  private boolean federationEnabled = false;
 
   /**
    * Creates an instance of the service.
@@ -144,7 +146,10 @@ public class AMRMProxyService extends CompositeService implements
           RegistryOperations.class);
       addService(this.registry);
     }
-
+    this.federationFacade = FederationStateStoreFacade.getInstance();
+    this.federationEnabled =
+        conf.getBoolean(YarnConfiguration.FEDERATION_ENABLED,
+            YarnConfiguration.DEFAULT_FEDERATION_ENABLED);
     super.serviceInit(conf);
   }
 
@@ -389,13 +394,22 @@ public class AMRMProxyService extends CompositeService implements
       throws IOException, YarnException {
     long startTime = clock.getTime();
     try {
-      LOG.info("Callback received for initializing request "
-          + "processing pipeline for an AM");
       ContainerTokenIdentifier containerTokenIdentifierForKey =
           BuilderUtils.newContainerTokenIdentifier(request.getContainerToken());
       ApplicationAttemptId appAttemptId =
           containerTokenIdentifierForKey.getContainerID()
               .getApplicationAttemptId();
+      ApplicationId applicationID = appAttemptId.getApplicationId();
+      // Checking if application is there in federation state store only
+      // if federation is enabled. If
+      // application is submitted to router then it adds it in statestore.
+      // if application is not found in statestore that means its
+      // submitted to RM
+      if (!checkIfAppExistsInStateStore(applicationID)) {
+        return;
+      }
+      LOG.info("Callback received for initializing request "
+          + "processing pipeline for an AM");
       Credentials credentials = YarnServerSecurityUtils
           .parseCredentials(request.getContainerLaunchContext());
 
@@ -772,6 +786,21 @@ public class AMRMProxyService extends CompositeService implements
     }
   }
 
+  boolean checkIfAppExistsInStateStore(ApplicationId applicationID) {
+    if (!federationEnabled) {
+      return true;
+    }
+
+    try {
+      // Check if app is there in state store. If app is not there then it
+      // throws Exception
+      this.federationFacade.getApplicationHomeSubCluster(applicationID);
+    } catch (YarnException ex) {
+      return false;
+    }
+    return true;
+  }
+
   @SuppressWarnings("unchecked")
   private Token<AMRMTokenIdentifier> getFirstAMRMToken(
       Collection<Token<? extends TokenIdentifier>> allTokens) {

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java

@@ -662,6 +662,27 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
     Assert.assertEquals(0, state.getAppContexts().size());
   }
 
+  @Test
+  public void testCheckIfAppExistsInStateStore()
+      throws IOException, YarnException {
+    ApplicationId appId = ApplicationId.newInstance(0, 0);
+    Configuration conf = createConfiguration();
+    conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+
+    createAndStartAMRMProxyService(conf);
+
+    Assert.assertEquals(false,
+        getAMRMProxyService().checkIfAppExistsInStateStore(appId));
+
+    Configuration distConf = createConfiguration();
+    conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
+
+    createAndStartAMRMProxyService(distConf);
+
+    Assert.assertEquals(true,
+        getAMRMProxyService().checkIfAppExistsInStateStore(appId));
+  }
+
   /**
    * A mock intercepter implementation that uses the same mockRM instance across
    * restart.