Przeglądaj źródła

YARN-7652. Handle AM register requests asynchronously in FederationInterceptor. Contributed by Botong Huang.

Inigo Goiri 6 lat temu
rodzic
commit
c3d22d3b45

+ 2 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMHeartbeatRequestHandler.java

@@ -59,7 +59,7 @@ public class AMHeartbeatRequestHandler extends Thread {
   private int lastResponseId;
 
   public AMHeartbeatRequestHandler(Configuration conf,
-      ApplicationId applicationId) {
+      ApplicationId applicationId, AMRMClientRelayer rmProxyRelayer) {
     super("AMHeartbeatRequestHandler Heartbeat Handler Thread");
     this.setUncaughtExceptionHandler(
         new HeartBeatThreadUncaughtExceptionHandler());
@@ -69,6 +69,7 @@ public class AMHeartbeatRequestHandler extends Thread {
     this.conf = conf;
     this.applicationId = applicationId;
     this.requestQueue = new LinkedBlockingQueue<>();
+    this.rmProxyRelayer = rmProxyRelayer;
 
     resetLastResponseId();
   }
@@ -156,13 +157,6 @@ public class AMHeartbeatRequestHandler extends Thread {
     this.lastResponseId = 0;
   }
 
-  /**
-   * Set the AMRMClientRelayer for RM connection.
-   */
-  public void setAMRMClientRelayer(AMRMClientRelayer relayer) {
-    this.rmProxyRelayer = relayer;
-  }
-
   /**
    * Set the UGI for RM connection.
    */

+ 5 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java

@@ -186,7 +186,11 @@ public class AMRMClientRelayer extends AbstractService
     this.amRegistrationRequest = registerRequest;
   }
 
-  public void setRMClient(ApplicationMasterProtocol client){
+  public String getRMIdentifier() {
+    return this.rmId;
+  }
+
+  public void setRMClient(ApplicationMasterProtocol client) {
     this.rmClient = client;
   }
 

+ 11 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java

@@ -134,10 +134,12 @@ public class UnmanagedApplicationManager {
     this.submitter = submitter;
     this.appNameSuffix = appNameSuffix;
     this.userUgi = null;
-    this.heartbeatHandler =
-        new AMHeartbeatRequestHandler(this.conf, this.applicationId);
+    // Relayer's rmClient will be set after the RM connection is created
     this.rmProxyRelayer =
         new AMRMClientRelayer(null, this.applicationId, rmName);
+    this.heartbeatHandler = createAMHeartbeatRequestHandler(this.conf,
+        this.applicationId, this.rmProxyRelayer);
+
     this.connectionInitiated = false;
     this.registerRequest = null;
     this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
@@ -150,6 +152,13 @@ public class UnmanagedApplicationManager {
         keepContainersAcrossApplicationAttempts;
   }
 
+  @VisibleForTesting
+  protected AMHeartbeatRequestHandler createAMHeartbeatRequestHandler(
+      Configuration config, ApplicationId appId,
+      AMRMClientRelayer relayer) {
+    return new AMHeartbeatRequestHandler(config, appId, relayer);
+  }
+
   /**
    * Launch a new UAM in the resource manager.
    *
@@ -191,8 +200,6 @@ public class UnmanagedApplicationManager {
         this.applicationId.toString(), UserGroupInformation.getCurrentUser());
     this.rmProxyRelayer.setRMClient(createRMProxy(
         ApplicationMasterProtocol.class, this.conf, this.userUgi, amrmToken));
-
-    this.heartbeatHandler.setAMRMClientRelayer(this.rmProxyRelayer);
     this.heartbeatHandler.setUGI(this.userUgi);
   }
 
@@ -521,11 +528,6 @@ public class UnmanagedApplicationManager {
     return this.heartbeatHandler.getRequestQueueSize();
   }
 
-  @VisibleForTesting
-  protected void setHandlerThread(AMHeartbeatRequestHandler thread) {
-    this.heartbeatHandler = thread;
-  }
-
   @VisibleForTesting
   protected void drainHeartbeatThread() {
     this.heartbeatHandler.drainHeartbeatThread();

+ 17 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java

@@ -207,10 +207,15 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
   private boolean shouldWaitForSyncNextAllocate = false;
 
   // For unit test synchronization
-  private static Object syncObj = new Object();
+  private static Object registerSyncObj = new Object();
+  private static Object allocateSyncObj = new Object();
 
-  public static Object getSyncObj() {
-    return syncObj;
+  public static Object getRegisterSyncObj() {
+    return registerSyncObj;
+  }
+
+  public static Object getAllocateSyncObj() {
+    return allocateSyncObj;
   }
 
   public MockResourceManagerFacade(Configuration conf,
@@ -290,14 +295,14 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
     shouldReRegisterNext = false;
 
     // Make sure we wait for certain test cases last in the method
-    synchronized (syncObj) {
-      syncObj.notifyAll();
+    synchronized (registerSyncObj) {
+      registerSyncObj.notifyAll();
       // We reuse the port number to indicate whether the unit test want us to
       // wait here
       if (request.getRpcPort() > 1000) {
         LOG.info("Register call in RM start waiting");
         try {
-          syncObj.wait();
+          registerSyncObj.wait();
           LOG.info("Register call in RM wait finished");
         } catch (InterruptedException e) {
           LOG.info("Register call in RM wait interrupted", e);
@@ -364,13 +369,13 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
     }
 
     // Wait for signal for certain test cases
-    synchronized (syncObj) {
+    synchronized (allocateSyncObj) {
       if (shouldWaitForSyncNextAllocate) {
         shouldWaitForSyncNextAllocate = false;
 
         LOG.info("Allocate call in RM start waiting");
         try {
-          syncObj.wait();
+          allocateSyncObj.wait();
           LOG.info("Allocate call in RM wait finished");
         } catch (InterruptedException e) {
           LOG.info("Allocate call in RM wait interrupted", e);
@@ -431,8 +436,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
           }
 
           Assert.assertTrue("ContainerId " + id
-              + " being released is not valid for application: "
-              + conf.get("AMRMTOKEN"), found);
+              + " being released is not valid for application: " + attemptId,
+              found);
 
           ids.remove(id);
           completedList.add(
@@ -442,7 +447,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
     }
 
     LOG.info("Allocating containers: " + containerList.size()
-        + " for application attempt: " + conf.get("AMRMTOKEN"));
+        + " for application attempt: " + attemptId);
 
     // Always issue a new AMRMToken as if RM rolled master key
     Token newAMRMToken = Token.newInstance(new byte[0],
@@ -455,7 +460,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
   }
 
   public void setWaitForSyncNextAllocate(boolean wait) {
-    synchronized (syncObj) {
+    synchronized (allocateSyncObj) {
       shouldWaitForSyncNextAllocate = wait;
     }
   }

+ 45 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/uam/TestUnmanagedApplicationManager.java

@@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler;
+import org.apache.hadoop.yarn.server.AMRMClientRelayer;
 import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
 import org.apache.hadoop.yarn.util.AsyncCallback;
 import org.junit.Assert;
@@ -65,7 +67,8 @@ public class TestUnmanagedApplicationManager {
         ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1);
 
     uam = new TestableUnmanagedApplicationManager(conf,
-        attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true);
+        attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true,
+        "rm");
   }
 
   protected void waitForCallBackCountAndCheckZeroPending(
@@ -121,7 +124,8 @@ public class TestUnmanagedApplicationManager {
 
     MockResourceManagerFacade rmProxy = uam.getRMProxy();
     uam = new TestableUnmanagedApplicationManager(conf,
-        attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true);
+        attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true,
+        "rm");
     uam.setRMProxy(rmProxy);
 
     reAttachUAM(null, attemptId);
@@ -186,7 +190,7 @@ public class TestUnmanagedApplicationManager {
     });
 
     // Sync obj from mock RM
-    Object syncObj = MockResourceManagerFacade.getSyncObj();
+    Object syncObj = MockResourceManagerFacade.getRegisterSyncObj();
 
     // Wait for register call in the thread get into RM and then wake us
     synchronized (syncObj) {
@@ -365,16 +369,24 @@ public class TestUnmanagedApplicationManager {
   /**
    * Testable UnmanagedApplicationManager that talks to a mock RM.
    */
-  public static class TestableUnmanagedApplicationManager
+  public class TestableUnmanagedApplicationManager
       extends UnmanagedApplicationManager {
 
     private MockResourceManagerFacade rmProxy;
 
     public TestableUnmanagedApplicationManager(Configuration conf,
         ApplicationId appId, String queueName, String submitter,
-        String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
+        String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
+        String rmName) {
       super(conf, appId, queueName, submitter, appNameSuffix,
-          keepContainersAcrossApplicationAttempts, "TEST");
+          keepContainersAcrossApplicationAttempts, rmName);
+    }
+
+    @Override
+    protected AMHeartbeatRequestHandler createAMHeartbeatRequestHandler(
+        Configuration config, ApplicationId appId,
+        AMRMClientRelayer rmProxyRelayer) {
+      return new TestableAMRequestHandlerThread(config, appId, rmProxyRelayer);
     }
 
     @SuppressWarnings("unchecked")
@@ -402,4 +414,31 @@ public class TestUnmanagedApplicationManager {
     }
   }
 
+  /**
+   * Wrap the handler thread so it calls from the same user.
+   */
+  public class TestableAMRequestHandlerThread
+      extends AMHeartbeatRequestHandler {
+    public TestableAMRequestHandlerThread(Configuration conf,
+        ApplicationId applicationId, AMRMClientRelayer rmProxyRelayer) {
+      super(conf, applicationId, rmProxyRelayer);
+    }
+
+    @Override
+    public void run() {
+      try {
+        getUGIWithToken(attemptId)
+            .doAs(new PrivilegedExceptionAction<Object>() {
+              @Override
+              public Object run() {
+                TestableAMRequestHandlerThread.super.run();
+                return null;
+              }
+            });
+      } catch (Exception e) {
+        LOG.error("Exception running TestableAMRequestHandlerThread", e);
+      }
+    }
+  }
+
 }

+ 112 - 179
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java

@@ -167,6 +167,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
    */
   private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink;
 
+  private Map<SubClusterId, RegisterApplicationMasterResponse> uamRegistrations;
+
+  // For unit test synchronization
+  private Map<SubClusterId, Future<?>> uamRegisterFutures;
+
   /** Thread pool used for asynchronous operations. */
   private ExecutorService threadpool;
 
@@ -227,6 +232,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   public FederationInterceptor() {
     this.containerIdToSubClusterIdMap = new ConcurrentHashMap<>();
     this.asyncResponseSink = new ConcurrentHashMap<>();
+    this.uamRegistrations = new ConcurrentHashMap<>();
+    this.uamRegisterFutures = new ConcurrentHashMap<>();
     this.threadpool = Executors.newCachedThreadPool();
     this.uamPool = createUnmanagedAMPoolManager(this.threadpool);
     this.secondaryRelayers = new ConcurrentHashMap<>();
@@ -279,8 +286,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
         ApplicationMasterProtocol.class, appOwner), appId,
         this.homeSubClusterId.toString());
 
-    this.homeHeartbeartHandler = createHomeHeartbeartHandler(conf, appId);
-    this.homeHeartbeartHandler.setAMRMClientRelayer(this.homeRMRelayer);
+    this.homeHeartbeartHandler =
+        createHomeHeartbeartHandler(conf, appId, this.homeRMRelayer);
     this.homeHeartbeartHandler.setUGI(appOwner);
     this.homeHeartbeartHandler.setDaemon(true);
     this.homeHeartbeartHandler.start();
@@ -615,10 +622,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       /**
        * Send the requests to the all sub-cluster resource managers. All
        * requests are synchronously triggered but sent asynchronously. Later the
-       * responses will be collected and merged. In addition, it also returns
-       * the newly registered UAMs.
+       * responses will be collected and merged.
        */
-      Registrations newRegistrations = sendRequestsToResourceManagers(requests);
+      sendRequestsToResourceManagers(requests);
 
       // Wait for the first async response to arrive
       long startTime = this.clock.getTime();
@@ -646,9 +652,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
 
       // Merge the containers and NMTokens from the new registrations into
       // the response
-      if (!isNullOrEmpty(newRegistrations.getSuccessfulRegistrations())) {
-        mergeRegistrationResponses(response,
-            newRegistrations.getSuccessfulRegistrations());
+
+      if (!isNullOrEmpty(this.uamRegistrations)) {
+        Map<SubClusterId, RegisterApplicationMasterResponse> newRegistrations;
+        synchronized (this.uamRegistrations) {
+          newRegistrations = new HashMap<>(this.uamRegistrations);
+          this.uamRegistrations.clear();
+        }
+        mergeRegistrationResponses(response, newRegistrations);
       }
 
       // update the responseId and return the final response to AM
@@ -850,8 +861,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
 
   @VisibleForTesting
   protected AMHeartbeatRequestHandler createHomeHeartbeartHandler(
-      Configuration conf, ApplicationId appId) {
-    return new AMHeartbeatRequestHandler(conf, appId);
+      Configuration conf, ApplicationId appId,
+      AMRMClientRelayer rmProxyRelayer) {
+    return new AMHeartbeatRequestHandler(conf, appId, rmProxyRelayer);
   }
 
   /**
@@ -1107,18 +1119,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
    *
    * @param requests contains the heart beat requests to send to the resource
    *          manager keyed by the sub-cluster id
-   * @return the registration responses from the newly added sub-cluster
-   *         resource managers
    * @throws YarnException
    * @throws IOException
    */
-  private Registrations sendRequestsToResourceManagers(
+  private void sendRequestsToResourceManagers(
       Map<SubClusterId, AllocateRequest> requests)
       throws YarnException, IOException {
 
-    // Create new UAM instances for the sub-cluster that we have not seen
-    // before
-    Registrations registrations = registerWithNewSubClusters(requests.keySet());
+    // Create new UAM instances for the sub-cluster that we haven't seen before
+    List<SubClusterId> newSubClusters =
+        registerAndAllocateWithNewSubClusters(requests);
 
     // Now that all the registrations are done, send the allocation request
     // to the sub-cluster RMs asynchronously and don't wait for the response.
@@ -1126,6 +1136,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     // response sink, then merged and sent to the application master.
     for (Entry<SubClusterId, AllocateRequest> entry : requests.entrySet()) {
       SubClusterId subClusterId = entry.getKey();
+      if (newSubClusters.contains(subClusterId)) {
+        // For new sub-clusters, we have already sent the request right after
+        // register in the async thread
+        continue;
+      }
 
       if (subClusterId.equals(this.homeSubClusterId)) {
         // Request for the home sub-cluster resource manager
@@ -1133,131 +1148,100 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
             new HeartbeatCallBack(this.homeSubClusterId, false));
       } else {
         if (!this.uamPool.hasUAMId(subClusterId.getId())) {
-          // TODO: This means that the registration for this sub-cluster RM
-          // failed. For now, we ignore the resource requests and continue
-          // but we need to fix this and handle this situation. One way would
-          // be to send the request to another RM by consulting the policy.
-          LOG.warn("Unmanaged AM registration not found for sub-cluster {}",
-              subClusterId);
-          continue;
+          throw new YarnException("UAM not found for " + this.attemptId
+              + " in sub-cluster " + subClusterId);
         }
         this.uamPool.allocateAsync(subClusterId.getId(), entry.getValue(),
             new HeartbeatCallBack(subClusterId, true));
       }
     }
-
-    return registrations;
   }
 
   /**
-   * This method ensures that Unmanaged AMs are created for each of the
-   * specified sub-cluster specified in the input and registers with the
-   * corresponding resource managers.
+   * This method ensures that Unmanaged AMs are created for newly specified
+   * sub-clusters, registers with the corresponding resource managers and send
+   * the first allocate request async.
    */
-  private Registrations registerWithNewSubClusters(
-      Set<SubClusterId> subClusterSet) throws IOException {
-
-    List<SubClusterId> failedRegistrations = new ArrayList<>();
-    Map<SubClusterId, RegisterApplicationMasterResponse>
-        successfulRegistrations = new HashMap<>();
+  private List<SubClusterId> registerAndAllocateWithNewSubClusters(
+      final Map<SubClusterId, AllocateRequest> requests) throws IOException {
 
     // Check to see if there are any new sub-clusters in this request
     // list and create and register Unmanaged AM instance for the new ones
-    List<String> newSubClusters = new ArrayList<>();
-    for (SubClusterId subClusterId : subClusterSet) {
+    List<SubClusterId> newSubClusters = new ArrayList<>();
+    for (SubClusterId subClusterId : requests.keySet()) {
       if (!subClusterId.equals(this.homeSubClusterId)
           && !this.uamPool.hasUAMId(subClusterId.getId())) {
-        newSubClusters.add(subClusterId.getId());
+        newSubClusters.add(subClusterId);
       }
     }
 
-    if (newSubClusters.size() > 0) {
-      final RegisterApplicationMasterRequest registerRequest =
-          this.amRegistrationRequest;
-      final AMRMProxyApplicationContext appContext = getApplicationContext();
-      ExecutorCompletionService<RegisterApplicationMasterResponseInfo>
-          completionService = new ExecutorCompletionService<>(this.threadpool);
+    this.uamRegisterFutures.clear();
+    for (final SubClusterId scId : newSubClusters) {
+      Future<?> future = this.threadpool.submit(new Runnable() {
+        @Override
+        public void run() {
+          String subClusterId = scId.getId();
 
-      for (final String subClusterId : newSubClusters) {
-        completionService
-            .submit(new Callable<RegisterApplicationMasterResponseInfo>() {
-              @Override
-              public RegisterApplicationMasterResponseInfo call()
-                  throws Exception {
-
-                // Create a config loaded with federation on and subclusterId
-                // for each UAM
-                YarnConfiguration config = new YarnConfiguration(getConf());
-                FederationProxyProviderUtil.updateConfForFederation(config,
-                    subClusterId);
-
-                RegisterApplicationMasterResponse uamResponse = null;
-                Token<AMRMTokenIdentifier> token = null;
-                try {
-                  // For appNameSuffix, use subClusterId of the home sub-cluster
-                  token = uamPool.launchUAM(subClusterId, config,
-                      attemptId.getApplicationId(),
-                      amRegistrationResponse.getQueue(), appContext.getUser(),
-                      homeSubClusterId.toString(), true, subClusterId);
-
-                  secondaryRelayers.put(subClusterId,
-                      uamPool.getAMRMClientRelayer(subClusterId));
-
-                  uamResponse = uamPool.registerApplicationMaster(subClusterId,
-                      registerRequest);
-                } catch (Throwable e) {
-                  LOG.error("Failed to register application master: "
-                      + subClusterId + " Application: " + attemptId, e);
-                }
-                return new RegisterApplicationMasterResponseInfo(uamResponse,
-                    SubClusterId.newInstance(subClusterId), token);
-              }
-            });
-      }
+          // Create a config loaded with federation on and subclusterId
+          // for each UAM
+          YarnConfiguration config = new YarnConfiguration(getConf());
+          FederationProxyProviderUtil.updateConfForFederation(config,
+              subClusterId);
 
-      // Wait for other sub-cluster resource managers to return the
-      // response and add it to the Map for returning to the caller
-      for (int i = 0; i < newSubClusters.size(); ++i) {
-        try {
-          Future<RegisterApplicationMasterResponseInfo> future =
-              completionService.take();
-          RegisterApplicationMasterResponseInfo uamResponse = future.get();
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Received register application response from RM: "
-                + uamResponse.getSubClusterId());
+          RegisterApplicationMasterResponse uamResponse = null;
+          Token<AMRMTokenIdentifier> token = null;
+          try {
+            // For appNameSuffix, use subClusterId of the home sub-cluster
+            token = uamPool.launchUAM(subClusterId, config,
+                attemptId.getApplicationId(), amRegistrationResponse.getQueue(),
+                getApplicationContext().getUser(), homeSubClusterId.toString(),
+                true, subClusterId);
+
+            secondaryRelayers.put(subClusterId,
+                uamPool.getAMRMClientRelayer(subClusterId));
+
+            uamResponse = uamPool.registerApplicationMaster(subClusterId,
+                amRegistrationRequest);
+          } catch (Throwable e) {
+            LOG.error("Failed to register application master: " + subClusterId
+                + " Application: " + attemptId, e);
+            // TODO: UAM registration for this sub-cluster RM
+            // failed. For now, we ignore the resource requests and continue
+            // but we need to fix this and handle this situation. One way would
+            // be to send the request to another RM by consulting the policy.
+            return;
           }
+          uamRegistrations.put(scId, uamResponse);
+          LOG.info("Successfully registered unmanaged application master: "
+              + subClusterId + " ApplicationId: " + attemptId);
 
-          if (uamResponse.getResponse() == null) {
-            failedRegistrations.add(uamResponse.getSubClusterId());
-          } else {
-            LOG.info("Successfully registered unmanaged application master: "
-                + uamResponse.getSubClusterId() + " ApplicationId: "
-                + this.attemptId);
-            successfulRegistrations.put(uamResponse.getSubClusterId(),
-                uamResponse.getResponse());
+          try {
+            uamPool.allocateAsync(subClusterId, requests.get(scId),
+                new HeartbeatCallBack(scId, true));
+          } catch (Throwable e) {
+            LOG.error("Failed to allocate async to " + subClusterId
+                + " Application: " + attemptId, e);
+          }
 
-            // Save the UAM token in registry or NMSS
+          // Save the UAM token in registry or NMSS
+          try {
             if (registryClient != null) {
-              registryClient.writeAMRMTokenForUAM(
-                  this.attemptId.getApplicationId(),
-                  uamResponse.getSubClusterId().getId(),
-                  uamResponse.getUamToken());
+              registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(),
+                  subClusterId, token);
             } else if (getNMStateStore() != null) {
-              getNMStateStore().storeAMRMProxyAppContextEntry(this.attemptId,
-                  NMSS_SECONDARY_SC_PREFIX
-                      + uamResponse.getSubClusterId().getId(),
-                  uamResponse.getUamToken().encodeToUrlString()
-                      .getBytes(STRING_TO_BYTE_FORMAT));
+              getNMStateStore().storeAMRMProxyAppContextEntry(attemptId,
+                  NMSS_SECONDARY_SC_PREFIX + subClusterId,
+                  token.encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT));
             }
+          } catch (Throwable e) {
+            LOG.error("Failed to persist UAM token from " + subClusterId
+                + " Application: " + attemptId, e);
           }
-        } catch (Exception e) {
-          LOG.warn("Failed to register unmanaged application master: "
-              + " ApplicationId: " + this.attemptId, e);
         }
-      }
+      });
+      this.uamRegisterFutures.put(scId, future);
     }
-
-    return new Registrations(successfulRegistrations, failedRegistrations);
+    return newSubClusters;
   }
 
   /**
@@ -1573,7 +1557,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   }
 
   @VisibleForTesting
-  public int getUnmanagedAMPoolSize() {
+  protected int getUnmanagedAMPoolSize() {
     return this.uamPool.getAllUAMIds().size();
   }
 
@@ -1582,6 +1566,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     return this.uamPool;
   }
 
+  @VisibleForTesting
+  protected Map<SubClusterId, Future<?>> getUamRegisterFutures() {
+    return this.uamRegisterFutures;
+  }
+
   @VisibleForTesting
   public Map<SubClusterId, List<AllocateResponse>> getAsyncResponseSink() {
     return this.asyncResponseSink;
@@ -1614,7 +1603,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
         asyncResponseSink.notifyAll();
       }
 
+      // Notify policy of allocate response
+      try {
+        policyInterpreter.notifyOfResponse(subClusterId, response);
+      } catch (YarnException e) {
+        LOG.warn("notifyOfResponse for policy failed for sub-cluster "
+            + subClusterId, e);
+      }
+
       // Save the new AMRMToken for the UAM if present
+      // Do this last because it can be slow...
       if (this.isUAM && response.getAMRMToken() != null) {
         Token<AMRMTokenIdentifier> newToken = ConverterUtils
             .convertFromYarn(response.getAMRMToken(), (Text) null);
@@ -1648,44 +1646,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
           }
         }
       }
-
-      // Notify policy of allocate response
-      try {
-        policyInterpreter.notifyOfResponse(subClusterId, response);
-      } catch (YarnException e) {
-        LOG.warn("notifyOfResponse for policy failed for sub-cluster "
-            + subClusterId, e);
-      }
-    }
-  }
-
-  /**
-   * Private structure for encapsulating SubClusterId and
-   * RegisterApplicationMasterResponse instances.
-   */
-  private static class RegisterApplicationMasterResponseInfo {
-    private RegisterApplicationMasterResponse response;
-    private SubClusterId subClusterId;
-    private Token<AMRMTokenIdentifier> uamToken;
-
-    RegisterApplicationMasterResponseInfo(
-        RegisterApplicationMasterResponse response, SubClusterId subClusterId,
-        Token<AMRMTokenIdentifier> uamToken) {
-      this.response = response;
-      this.subClusterId = subClusterId;
-      this.uamToken = uamToken;
-    }
-
-    public RegisterApplicationMasterResponse getResponse() {
-      return response;
-    }
-
-    public SubClusterId getSubClusterId() {
-      return subClusterId;
-    }
-
-    public Token<AMRMTokenIdentifier> getUamToken() {
-      return uamToken;
     }
   }
 
@@ -1712,33 +1672,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     }
   }
 
-  /**
-   * Private structure for encapsulating successful and failed application
-   * master registration responses.
-   */
-  private static class Registrations {
-    private Map<SubClusterId, RegisterApplicationMasterResponse>
-        successfulRegistrations;
-    private List<SubClusterId> failedRegistrations;
-
-    Registrations(
-        Map<SubClusterId, RegisterApplicationMasterResponse>
-            successfulRegistrations,
-        List<SubClusterId> failedRegistrations) {
-      this.successfulRegistrations = successfulRegistrations;
-      this.failedRegistrations = failedRegistrations;
-    }
-
-    public Map<SubClusterId, RegisterApplicationMasterResponse>
-        getSuccessfulRegistrations() {
-      return this.successfulRegistrations;
-    }
-
-    public List<SubClusterId> getFailedRegistrations() {
-      return this.failedRegistrations;
-    }
-  }
-
   /**
    * Utility method to check if the specified Collection is null or empty.
    *

+ 7 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java

@@ -201,9 +201,6 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     LOG.info("Number of allocated containers in the original request: "
         + Integer.toString(allocateResponse.getAllocatedContainers().size()));
 
-    // Make sure this request is picked up by all async heartbeat handlers
-    interceptor.drainAllAsyncQueue(false);
-
     // Send max 10 heart beats to receive all the containers. If not, we will
     // fail the test
     int numHeartbeat = 0;
@@ -217,8 +214,10 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
       checkAMRMToken(allocateResponse.getAMRMToken());
       lastResponseId = allocateResponse.getResponseId();
 
-      containers.addAll(allocateResponse.getAllocatedContainers());
+      // Make sure this request is picked up by all async heartbeat handlers
+      interceptor.drainAllAsyncQueue(false);
 
+      containers.addAll(allocateResponse.getAllocatedContainers());
       LOG.info("Number of allocated containers in this request: "
           + Integer.toString(allocateResponse.getAllocatedContainers().size()));
       LOG.info("Total number of allocated containers: "
@@ -258,9 +257,6 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     LOG.info("Number of containers received in the original request: "
         + Integer.toString(newlyFinished.size()));
 
-    // Make sure this request is picked up by all async heartbeat handlers
-    interceptor.drainAllAsyncQueue(false);
-
     // Send max 10 heart beats to receive all the containers. If not, we will
     // fail the test
     int numHeartbeat = 0;
@@ -273,10 +269,12 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
       checkAMRMToken(allocateResponse.getAMRMToken());
       lastResponseId = allocateResponse.getResponseId();
 
+      // Make sure this request is picked up by all async heartbeat handlers
+      interceptor.drainAllAsyncQueue(false);
+
       newlyFinished = getCompletedContainerIds(
           allocateResponse.getCompletedContainersStatuses());
       containersForReleasedContainerIds.addAll(newlyFinished);
-
       LOG.info("Number of containers received in this request: "
           + Integer.toString(newlyFinished.size()));
       LOG.info("Total number of containers received: "
@@ -438,7 +436,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     ExecutorCompletionService<RegisterApplicationMasterResponse> compSvc =
         new ExecutorCompletionService<>(threadpool);
 
-    Object syncObj = MockResourceManagerFacade.getSyncObj();
+    Object syncObj = MockResourceManagerFacade.getRegisterSyncObj();
 
     // Two register threads
     synchronized (syncObj) {

+ 18 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler;
+import org.apache.hadoop.yarn.server.AMRMClientRelayer;
 import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
 import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
 import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
@@ -69,8 +70,9 @@ public class TestableFederationInterceptor extends FederationInterceptor {
 
   @Override
   protected AMHeartbeatRequestHandler createHomeHeartbeartHandler(
-      Configuration conf, ApplicationId appId) {
-    return new TestableAMRequestHandlerThread(conf, appId);
+      Configuration conf, ApplicationId appId,
+      AMRMClientRelayer rmProxyRelayer) {
+    return new TestableAMRequestHandlerThread(conf, appId, rmProxyRelayer);
   }
 
   @SuppressWarnings("unchecked")
@@ -205,7 +207,8 @@ public class TestableFederationInterceptor extends FederationInterceptor {
         String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
         String rmId) {
       return new TestableUnmanagedApplicationManager(conf, appId, queueName,
-          submitter, appNameSuffix, keepContainersAcrossApplicationAttempts);
+          submitter, appNameSuffix, keepContainersAcrossApplicationAttempts,
+          rmId);
     }
   }
 
@@ -218,10 +221,17 @@ public class TestableFederationInterceptor extends FederationInterceptor {
 
     public TestableUnmanagedApplicationManager(Configuration conf,
         ApplicationId appId, String queueName, String submitter,
-        String appNameSuffix, boolean keepContainersAcrossApplicationAttempts) {
+        String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
+        String rmName) {
       super(conf, appId, queueName, submitter, appNameSuffix,
-          keepContainersAcrossApplicationAttempts, "TEST");
-      setHandlerThread(new TestableAMRequestHandlerThread(conf, appId));
+          keepContainersAcrossApplicationAttempts, rmName);
+    }
+
+    @Override
+    protected AMHeartbeatRequestHandler createAMHeartbeatRequestHandler(
+        Configuration conf, ApplicationId appId,
+        AMRMClientRelayer rmProxyRelayer) {
+      return new TestableAMRequestHandlerThread(conf, appId, rmProxyRelayer);
     }
 
     /**
@@ -244,8 +254,8 @@ public class TestableFederationInterceptor extends FederationInterceptor {
   protected class TestableAMRequestHandlerThread
       extends AMHeartbeatRequestHandler {
     public TestableAMRequestHandlerThread(Configuration conf,
-        ApplicationId applicationId) {
-      super(conf, applicationId);
+        ApplicationId applicationId, AMRMClientRelayer rmProxyRelayer) {
+      super(conf, applicationId, rmProxyRelayer);
     }
 
     @Override