Browse Source

YARN-11509. The FederationInterceptor#launchUAM Added retry logic. (#5727)

slfan1989 1 năm trước cách đây
mục cha
commit
8b88e9f8f4

+ 14 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -4058,6 +4058,20 @@ public class YarnConfiguration extends Configuration {
   public static final long DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT =
       60000; // one minute
 
+  // AMRMProxy Register UAM Retry-Num
+  public static final String FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT =
+      FEDERATION_PREFIX + "amrmproxy.register.uam.retry-count";
+  // Register a UAM , we will retry a maximum of 3 times.
+  public static final int DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT =
+      3;
+
+  // AMRMProxy Register UAM Retry Interval
+  public static final String FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL =
+      FEDERATION_PREFIX + "amrmproxy.register.uam.interval";
+  // Retry Interval, default 100 ms
+  public static final long DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL =
+      TimeUnit.MILLISECONDS.toMillis(100);
+
   public static final String DEFAULT_FEDERATION_POLICY_KEY = "*";
   public static final String FEDERATION_POLICY_MANAGER = FEDERATION_PREFIX
       + "policy-manager";

+ 18 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -5408,4 +5408,22 @@
     </description>
   </property>
 
+  <property>
+    <description>
+      The number of retry for Register UAM.
+      The default value is 3.
+    </description>
+    <name>yarn.federation.amrmproxy.register.uam.retry-count</name>
+    <value>3</value>
+  </property>
+
+  <property>
+    <description>
+      Interval between retry for Register UAM.
+      The default value is 100ms.
+    </description>
+    <name>yarn.federation.amrmproxy.register.uam.interval</name>
+    <value>100ms</value>
+  </property>
+
 </configuration>

+ 110 - 70
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java

@@ -36,6 +36,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
@@ -87,6 +88,7 @@ import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
 import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
+import org.apache.hadoop.yarn.server.federation.retry.FederationActionRetry;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
@@ -251,6 +253,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   // the maximum wait time for the first async heart beat response
   private long heartbeatMaxWaitTimeMs;
 
+  private int registerUamRetryNum;
+
+  private long registerUamRetryInterval;
+
   private boolean waitUamRegisterDone;
 
   private MonotonicClock clock = new MonotonicClock();
@@ -355,6 +361,24 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       this.subClusterTimeOut =
           YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT;
     }
+
+    this.registerUamRetryNum = conf.getInt(
+        YarnConfiguration.FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT,
+        YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT);
+    if (this.registerUamRetryNum <= 0) {
+      LOG.info("{} configured to be {}, should be positive. Using default of {}.",
+          YarnConfiguration.FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT,
+          this.subClusterTimeOut,
+          YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT);
+      this.registerUamRetryNum =
+          YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_COUNT;
+    }
+
+    this.registerUamRetryInterval = conf.getTimeDuration(
+        YarnConfiguration.FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL,
+        YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL,
+        TimeUnit.MILLISECONDS);
+
     this.waitUamRegisterDone = conf.getBoolean(YarnConfiguration.AMRM_PROXY_WAIT_UAM_REGISTER_DONE,
         YarnConfiguration.DEFAULT_AMRM_PROXY_WAIT_UAM_REGISTER_DONE);
   }
@@ -701,7 +725,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
 
     if (this.finishAMCalled) {
       LOG.warn("FinishApplicationMaster already called by {}, skip heartbeat "
-          + "processing and return dummy response" + this.attemptId);
+          + "processing and return dummy response.", this.attemptId);
       return RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
     }
 
@@ -1255,85 +1279,77 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
     // Check to see if there are any new sub-clusters in this request
     // list and create and register Unmanaged AM instance for the new ones
     List<SubClusterId> newSubClusters = new ArrayList<>();
-    for (SubClusterId subClusterId : requests.keySet()) {
-      if (!subClusterId.equals(this.homeSubClusterId)
-          && !this.uamPool.hasUAMId(subClusterId.getId())) {
-        newSubClusters.add(subClusterId);
 
+    requests.keySet().stream().forEach(subClusterId -> {
+      String id = subClusterId.getId();
+      if (!subClusterId.equals(this.homeSubClusterId) && !this.uamPool.hasUAMId(id)) {
+        newSubClusters.add(subClusterId);
         // Set sub-cluster to be timed out initially
-        lastSCResponseTime.put(subClusterId,
-            clock.getTime() - subClusterTimeOut);
+        lastSCResponseTime.put(subClusterId, clock.getTime() - subClusterTimeOut);
       }
-    }
+    });
 
     this.uamRegisterFutures.clear();
+
     for (final SubClusterId scId : newSubClusters) {
-      Future<?> future = this.threadpool.submit(new Runnable() {
-        @Override
-        public void run() {
-          String subClusterId = scId.getId();
-
-          // Create a config loaded with federation on and subclusterId
-          // for each UAM
-          YarnConfiguration config = new YarnConfiguration(getConf());
-          FederationProxyProviderUtil.updateConfForFederation(config,
-              subClusterId);
-
-          RegisterApplicationMasterResponse uamResponse = null;
-          Token<AMRMTokenIdentifier> token = null;
-          try {
-            ApplicationId applicationId = attemptId.getApplicationId();
-            ApplicationSubmissionContext originalSubmissionContext =
-                federationFacade.getApplicationSubmissionContext(applicationId);
-
-            // For appNameSuffix, use subClusterId of the home sub-cluster
-            token = uamPool.launchUAM(subClusterId, config,
-                applicationId, amRegistrationResponse.getQueue(),
-                getApplicationContext().getUser(), homeSubClusterId.toString(),
-                true, subClusterId, originalSubmissionContext);
-
-            secondaryRelayers.put(subClusterId,
-                uamPool.getAMRMClientRelayer(subClusterId));
-
-            uamResponse = uamPool.registerApplicationMaster(subClusterId,
-                amRegistrationRequest);
-          } catch (Throwable e) {
-            LOG.error("Failed to register application master: " + subClusterId
-                + " Application: " + attemptId, e);
-            // TODO: UAM registration for this sub-cluster RM
-            // failed. For now, we ignore the resource requests and continue
-            // but we need to fix this and handle this situation. One way would
-            // be to send the request to another RM by consulting the policy.
-            return;
-          }
-          uamRegistrations.put(scId, uamResponse);
-          LOG.info("Successfully registered unmanaged application master: "
-              + subClusterId + " ApplicationId: " + attemptId);
 
-          try {
-            uamPool.allocateAsync(subClusterId, requests.get(scId),
-                new HeartbeatCallBack(scId, true));
-          } catch (Throwable e) {
-            LOG.error("Failed to allocate async to " + subClusterId
-                + " Application: " + attemptId, e);
-          }
+      Future<?> future = this.threadpool.submit(() -> {
 
-          // Save the UAM token in registry or NMSS
-          try {
-            if (registryClient != null) {
-              registryClient.writeAMRMTokenForUAM(attemptId.getApplicationId(),
-                  subClusterId, token);
-            } else if (getNMStateStore() != null) {
-              getNMStateStore().storeAMRMProxyAppContextEntry(attemptId,
-                  NMSS_SECONDARY_SC_PREFIX + subClusterId,
-                  token.encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT));
-            }
-          } catch (Throwable e) {
-            LOG.error("Failed to persist UAM token from " + subClusterId
-                + " Application: " + attemptId, e);
+        String subClusterId = scId.getId();
+
+        // Create a config loaded with federation on and subclusterId
+        // for each UAM
+        YarnConfiguration config = new YarnConfiguration(getConf());
+        FederationProxyProviderUtil.updateConfForFederation(config, subClusterId);
+        ApplicationId applicationId = attemptId.getApplicationId();
+
+        RegisterApplicationMasterResponse uamResponse;
+        Token<AMRMTokenIdentifier> token;
+
+        // LaunchUAM And RegisterApplicationMaster
+        try {
+          TokenAndRegisterResponse result =
+              ((FederationActionRetry<TokenAndRegisterResponse>) (retryCount) ->
+              launchUAMAndRegisterApplicationMaster(config, subClusterId, applicationId)).
+              runWithRetries(registerUamRetryNum, registerUamRetryInterval);
+
+          token = result.getToken();
+          uamResponse = result.getResponse();
+        } catch (Throwable e) {
+          LOG.error("Failed to register application master: {} Application: {}.",
+              subClusterId, attemptId, e);
+          return;
+        }
+
+        uamRegistrations.put(scId, uamResponse);
+
+        LOG.info("Successfully registered unmanaged application master: {} " +
+            "ApplicationId: {}.", subClusterId, attemptId);
+
+        // Allocate Request
+        try {
+          uamPool.allocateAsync(subClusterId, requests.get(scId),
+              new HeartbeatCallBack(scId, true));
+        } catch (Throwable e) {
+          LOG.error("Failed to allocate async to {} Application: {}.",
+              subClusterId, attemptId, e);
+        }
+
+        // Save the UAM token in registry or NMSS
+        try {
+          if (registryClient != null) {
+            registryClient.writeAMRMTokenForUAM(applicationId, subClusterId, token);
+          } else if (getNMStateStore() != null) {
+            getNMStateStore().storeAMRMProxyAppContextEntry(attemptId,
+                NMSS_SECONDARY_SC_PREFIX + subClusterId,
+                token.encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT));
           }
+        } catch (Throwable e) {
+          LOG.error("Failed to persist UAM token from {} Application {}",
+              subClusterId, attemptId, e);
         }
       });
+
       this.uamRegisterFutures.put(scId, future);
     }
 
@@ -1347,10 +1363,34 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       }
     }
 
-
     return newSubClusters;
   }
 
+  protected TokenAndRegisterResponse launchUAMAndRegisterApplicationMaster(
+      YarnConfiguration config, String subClusterId, ApplicationId applicationId)
+      throws IOException, YarnException {
+
+    // Prepare parameter information
+    ApplicationSubmissionContext originalSubmissionContext =
+        federationFacade.getApplicationSubmissionContext(applicationId);
+    String submitter = getApplicationContext().getUser();
+    String homeRM = homeSubClusterId.toString();
+    String queue = amRegistrationResponse.getQueue();
+
+    // For appNameSuffix, use subClusterId of the home sub-cluster
+    Token<AMRMTokenIdentifier> token = uamPool.launchUAM(subClusterId, config, applicationId,
+        queue, submitter, homeRM, true, subClusterId, originalSubmissionContext);
+
+    // Set the relationship between SubCluster and AMRMClientRelayer.
+    secondaryRelayers.put(subClusterId, uamPool.getAMRMClientRelayer(subClusterId));
+
+    // RegisterApplicationMaster
+    RegisterApplicationMasterResponse uamResponse =
+        uamPool.registerApplicationMaster(subClusterId, amRegistrationRequest);
+
+    return new TokenAndRegisterResponse(token, uamResponse);
+  }
+
   /**
    * Prepare the base allocation response. Use lastSCResponse and
    * lastHeartbeatTimeStamp to assemble entries about cluster-wide info, e.g.

+ 45 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TokenAndRegisterResponse.java

@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+
+/**
+ * This class contains information about the AMRM token and the RegisterApplicationMasterResponse.
+ */
+public class TokenAndRegisterResponse {
+  private Token<AMRMTokenIdentifier> token;
+  private RegisterApplicationMasterResponse response;
+
+  public TokenAndRegisterResponse(Token<AMRMTokenIdentifier> pToken,
+      RegisterApplicationMasterResponse pResponse) {
+    this.token = pToken;
+    this.response = pResponse;
+  }
+
+  public Token<AMRMTokenIdentifier> getToken() {
+    return token;
+  }
+
+  public RegisterApplicationMasterResponse getResponse() {
+    return response;
+  }
+}

+ 52 - 13
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java

@@ -38,7 +38,6 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.registry.client.api.RegistryOperations;
 import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -179,9 +178,8 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
         500);
 
-    // Wait UAM Register Down
-    conf.setBoolean(YarnConfiguration.AMRM_PROXY_WAIT_UAM_REGISTER_DONE, true);
-
+    // Register UAM Retry Interval 1ms
+    conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_REGISTER_UAM_RETRY_INTERVAL, 1);
     return conf;
   }
 
@@ -597,10 +595,6 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
         interceptor.recover(recoveredDataMap);
 
         Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
-
-        // Waiting for SC-1 to time out.
-        GenericTestUtils.waitFor(() -> interceptor.getTimedOutSCs(true).size() == 1, 100, 1000);
-
         // SC1 should be initialized to be timed out
         Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
 
@@ -859,7 +853,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
         List<Container> containers =
             getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
         for (Container c : containers) {
-          LOG.info("Allocated container {}", c.getId());
+          LOG.info("Allocated container " + c.getId());
         }
         Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
 
@@ -893,10 +887,6 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
         int numberOfContainers = 3;
         // Should re-attach secondaries and get the three running containers
         Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
-
-        // Waiting for SC-1 to time out.
-        GenericTestUtils.waitFor(() -> interceptor.getTimedOutSCs(true).size() == 1, 100, 1000);
-
         // SC1 should be initialized to be timed out
         Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
         Assert.assertEquals(numberOfContainers,
@@ -1444,4 +1434,53 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     Assert.assertNotNull(finishResponse);
     Assert.assertTrue(finishResponse.getIsUnregistered());
   }
+
+  @Test
+  public void testLaunchUAMAndRegisterApplicationMasterRetry() throws Exception {
+
+    UserGroupInformation ugi = interceptor.getUGIWithToken(interceptor.getAttemptId());
+    interceptor.setRetryCount(2);
+
+    ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
+      // Register the application
+      RegisterApplicationMasterRequest registerReq =
+          Records.newRecord(RegisterApplicationMasterRequest.class);
+      registerReq.setHost(Integer.toString(testAppId));
+      registerReq.setRpcPort(0);
+      registerReq.setTrackingUrl("");
+
+      RegisterApplicationMasterResponse registerResponse =
+          interceptor.registerApplicationMaster(registerReq);
+      Assert.assertNotNull(registerResponse);
+      lastResponseId = 0;
+
+      Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+      // Allocate the first batch of containers, with sc1 active
+      registerSubCluster(SubClusterId.newInstance("SC-1"));
+
+      int numberOfContainers = 3;
+      List<Container> containers = getContainersAndAssert(numberOfContainers, numberOfContainers);
+      Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+
+      // Release all containers
+      releaseContainersAndAssert(containers);
+
+      // Finish the application
+      FinishApplicationMasterRequest finishReq =
+          Records.newRecord(FinishApplicationMasterRequest.class);
+      finishReq.setDiagnostics("");
+      finishReq.setTrackingUrl("");
+      finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+      FinishApplicationMasterResponse finishResponse =
+          interceptor.finishApplicationMaster(finishReq);
+      Assert.assertNotNull(finishResponse);
+      Assert.assertTrue(finishResponse.getIsUnregistered());
+
+      return null;
+    });
+
+    Assert.assertEquals(0, interceptor.getRetryCount());
+  }
 }

+ 19 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestableFederationInterceptor.java

@@ -55,6 +55,7 @@ public class TestableFederationInterceptor extends FederationInterceptor {
   private MockResourceManagerFacade mockRm;
 
   private boolean isClientRPC = false;
+  private int retryCount = 0;
 
   public TestableFederationInterceptor() {
   }
@@ -258,6 +259,24 @@ public class TestableFederationInterceptor extends FederationInterceptor {
     }
   }
 
+  @Override
+  protected TokenAndRegisterResponse launchUAMAndRegisterApplicationMaster(YarnConfiguration config,
+      String subClusterId, ApplicationId applicationId) throws IOException, YarnException {
+    if (retryCount > 0) {
+      retryCount--;
+      throw new YarnException("launchUAMAndRegisterApplicationMaster will retry");
+    }
+    return super.launchUAMAndRegisterApplicationMaster(config, subClusterId, applicationId);
+  }
+
+  public void setRetryCount(int retryCount) {
+    this.retryCount = retryCount;
+  }
+
+  public int getRetryCount() {
+    return retryCount;
+  }
+
   /**
    * Wrap the handler thread, so it calls from the same user.
    */