Browse Source

YARN-6667. Handle containerId duplicate without failing the heartbeat in Federation Interceptor. (#4810)

slfan1989 2 năm trước cách đây
mục cha
commit
3a96de7756

+ 65 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java

@@ -85,6 +85,7 @@ import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMR
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
@@ -1475,6 +1476,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   private void cacheAllocatedContainers(List<Container> containers,
       SubClusterId subClusterId) {
     for (Container container : containers) {
+      SubClusterId chooseSubClusterId = SubClusterId.newInstance(subClusterId.toString());
       LOG.debug("Adding container {}", container);
 
       if (this.containerIdToSubClusterIdMap.containsKey(container.getId())) {
@@ -1497,22 +1499,53 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
                   + " from same sub-cluster: {}, so ignoring.",
               container.getId(), subClusterId);
         } else {
+
+          LOG.info("Duplicate containerID found in the allocated containers. " +
+              "try to re-pick the sub-cluster.");
+
           // The same container allocation from different sub-clusters,
           // something is wrong.
-          // TODO: YARN-6667 if some subcluster RM is configured wrong, we
-          // should not fail the entire heartbeat.
-          throw new YarnRuntimeException(
-              "Duplicate containerID found in the allocated containers. This"
-                  + " can happen if the RM epoch is not configured properly."
-                  + " ContainerId: " + container.getId().toString()
-                  + " ApplicationId: " + this.attemptId + " From RM: "
-                  + subClusterId
-                  + " . Previous container was from sub-cluster: "
-                  + existingSubClusterId);
+          try {
+
+            boolean existAllocatedScHealth = isSCHealth(existingSubClusterId);
+            boolean newAllocatedScHealth = isSCHealth(subClusterId);
+
+            if (existAllocatedScHealth) {
+              // If the previous RM which allocated Container is normal,
+              // the previous RM will be used first
+              LOG.info("Use Previous Allocated Container's subCluster. " +
+                  "ContainerId: {} ApplicationId: {} From RM: {}.", this.attemptId,
+                  container.getId(), existingSubClusterId);
+              chooseSubClusterId = existingSubClusterId;
+            } else if (newAllocatedScHealth) {
+              // If the previous RM which allocated Container is abnormal,
+              // but the RM of the newly allocated Container is normal, use the new RM
+              LOG.info("Use Newly Allocated Container's subCluster. " +
+                  "ApplicationId: {} ContainerId: {} From RM: {}.", this.attemptId,
+                  container.getId(), subClusterId);
+              chooseSubClusterId = subClusterId;
+            } else {
+              // There is a very small probability that an exception will be thrown.
+              // The RM of the previously allocated Container
+              // and the RM of the newly allocated Container are not normal.
+              throw new YarnRuntimeException(
+                  " Can't use any subCluster because an exception occurred" +
+                  " ContainerId: " + container.getId() + " ApplicationId: " + this.attemptId +
+                  " From RM: " + subClusterId + ". " +
+                  " Previous Container was From subCluster: " + existingSubClusterId);
+            }
+          } catch (Exception ex) {
+            // An exception occurred
+            throw new YarnRuntimeException(
+                " Can't use any subCluster because an exception occurred" +
+                " ContainerId: " + container.getId() + " ApplicationId: " + this.attemptId +
+                " From RM: " + subClusterId + ". " +
+                " Previous Container was From subCluster: " + existingSubClusterId, ex);
+          }
         }
       }
 
-      this.containerIdToSubClusterIdMap.put(container.getId(), subClusterId);
+      this.containerIdToSubClusterIdMap.put(container.getId(), chooseSubClusterId);
     }
   }
 
@@ -1761,4 +1794,25 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   public static <T1, T2> boolean isNullOrEmpty(Map<T1, T2> c) {
     return (c == null || c.size() == 0);
   }
+
+  @VisibleForTesting
+  protected void cacheAllocatedContainersForSubClusterId(
+      List<Container> containers, SubClusterId subClusterId) {
+    cacheAllocatedContainers(containers, subClusterId);
+  }
+
+  @VisibleForTesting
+  protected Map<ContainerId, SubClusterId> getContainerIdToSubClusterIdMap() {
+    return containerIdToSubClusterIdMap;
+  }
+
+  private boolean isSCHealth(SubClusterId subClusterId) throws YarnException {
+    Set<SubClusterId> timeOutScs = getTimedOutSCs(true);
+    SubClusterInfo subClusterInfo = federationFacade.getSubCluster(subClusterId);
+    if (timeOutScs.contains(subClusterId) ||
+        subClusterInfo == null || subClusterInfo.getState().isUnusable()) {
+      return false;
+    }
+    return true;
+  }
 }

+ 97 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java

@@ -36,6 +36,7 @@ import java.util.concurrent.Executors;
 import org.apache.hadoop.registry.client.api.RegistryOperations;
 import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
 import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
 import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
@@ -970,6 +972,101 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     return preemptionMessage;
   }
 
+  @Test
+  public void testSameContainerFromDiffRM() throws IOException, InterruptedException {
+
+    UserGroupInformation ugi =
+        interceptor.getUGIWithToken(interceptor.getAttemptId());
+
+    ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
+
+      // Register the application
+      RegisterApplicationMasterRequest registerReq =
+          Records.newRecord(RegisterApplicationMasterRequest.class);
+      registerReq.setHost(Integer.toString(testAppId));
+      registerReq.setRpcPort(0);
+      registerReq.setTrackingUrl("");
+
+      RegisterApplicationMasterResponse registerResponse =
+          interceptor.registerApplicationMaster(registerReq);
+      Assert.assertNotNull(registerResponse);
+      lastResponseId = 0;
+
+      Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
+
+      // Allocate the first batch of containers, with sc1 active
+      SubClusterId subClusterId1 = SubClusterId.newInstance("SC-1");
+      registerSubCluster(subClusterId1);
+
+      int numberOfContainers = 3;
+      List<Container> containers =
+          getContainersAndAssert(numberOfContainers, numberOfContainers);
+      Assert.assertNotNull(containers);
+      Assert.assertEquals(3, containers.size());
+
+      // with sc2 active
+      SubClusterId subClusterId2 = SubClusterId.newInstance("SC-2");
+      registerSubCluster(subClusterId2);
+
+      // 1.Container has been registered to SubCluster1, try to register the same Container
+      // to SubCluster2.
+      // Because SubCluster1 is in normal state at this time,
+      // So the SubCluster corresponding to Container should be SubCluster1
+      interceptor.cacheAllocatedContainersForSubClusterId(containers, subClusterId2);
+      Map<ContainerId, SubClusterId> cIdToSCMap = interceptor.getContainerIdToSubClusterIdMap();
+      for (SubClusterId subClusterId : cIdToSCMap.values()) {
+        Assert.assertNotNull(subClusterId);
+        Assert.assertEquals(subClusterId1, subClusterId);
+      }
+
+      // 2.Deregister SubCluster1, Register the same Containers to SubCluster2
+      // So the SubCluster corresponding to Container should be SubCluster2
+      deRegisterSubCluster(subClusterId1);
+      interceptor.cacheAllocatedContainersForSubClusterId(containers, subClusterId2);
+      Map<ContainerId, SubClusterId> cIdToSCMap2 = interceptor.getContainerIdToSubClusterIdMap();
+      for (SubClusterId subClusterId : cIdToSCMap2.values()) {
+        Assert.assertNotNull(subClusterId);
+        Assert.assertEquals(subClusterId2, subClusterId);
+      }
+
+      // 3.Deregister subClusterId2, Register the same Containers to SubCluster1
+      // Because both SubCluster1 and SubCluster2 are abnormal at this time,
+      // an exception will be thrown when registering the first Container.
+      deRegisterSubCluster(subClusterId2);
+      Container container1 = containers.get(0);
+      Assert.assertNotNull(container1);
+      String errMsg =
+          " Can't use any subCluster because an exception occurred" +
+          " ContainerId: " + container1.getId() +
+          " ApplicationId: " + interceptor.getAttemptId() +
+          " From RM: " + subClusterId1 + ". " +
+          " Previous Container was From subCluster: " + subClusterId2;
+
+      LambdaTestUtils.intercept(YarnRuntimeException.class, errMsg,
+          () -> interceptor.cacheAllocatedContainersForSubClusterId(containers, subClusterId1));
+
+      // 4. register SubCluster1, re-register the Container,
+      // and try to finish application
+      registerSubCluster(subClusterId1);
+      interceptor.cacheAllocatedContainersForSubClusterId(containers, subClusterId1);
+      releaseContainersAndAssert(containers);
+
+      // Finish the application
+      FinishApplicationMasterRequest finishReq =
+          Records.newRecord(FinishApplicationMasterRequest.class);
+      finishReq.setDiagnostics("");
+      finishReq.setTrackingUrl("");
+      finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+      FinishApplicationMasterResponse finishResponse =
+          interceptor.finishApplicationMaster(finishReq);
+      Assert.assertNotNull(finishResponse);
+      Assert.assertTrue(finishResponse.getIsUnregistered());
+
+      return null;
+    });
+  }
+
   @Test
   public void testBatchFinishApplicationMaster() throws IOException, InterruptedException {