瀏覽代碼

YARN-11510. [Federation] Fix NodeManager#TestFederationInterceptor Flaky Unit Test. (#5733)

slfan1989 1 年之前
父節點
當前提交
5c02f21f2e

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -3075,6 +3075,10 @@ public class YarnConfiguration extends Configuration {
       + "amrmproxy.enabled";
       + "amrmproxy.enabled";
   public static final boolean DEFAULT_AMRM_PROXY_ENABLED = false;
   public static final boolean DEFAULT_AMRM_PROXY_ENABLED = false;
 
 
+  public static final String AMRM_PROXY_WAIT_UAM_REGISTER_DONE =
+      NM_PREFIX + "amrmproxy.wait.uam-register.done";
+  public static final boolean DEFAULT_AMRM_PROXY_WAIT_UAM_REGISTER_DONE = false;
+
   public static final String AMRM_PROXY_ADDRESS = NM_PREFIX
   public static final String AMRM_PROXY_ADDRESS = NM_PREFIX
       + "amrmproxy.address";
       + "amrmproxy.address";
   public static final int DEFAULT_AMRM_PROXY_PORT = 8049;
   public static final int DEFAULT_AMRM_PROXY_PORT = 8049;

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -5354,6 +5354,16 @@
     <value></value>
     <value></value>
   </property>
   </property>
 
 
+  <property>
+    <description>
+      Whether we wait for uam registration to complete.
+      The default value is false. If we set it to true,
+      the UAM needs to be registered before attempting to allocate a container.
+    </description>
+    <name>yarn.nodemanager.amrmproxy.wait.uam-register.done</name>
+    <value>false</value>
+  </property>
+
   <property>
   <property>
     <description>
     <description>
       YARN Federation supports Non-HA mode.
       YARN Federation supports Non-HA mode.

+ 7 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java

@@ -18,6 +18,7 @@
 
 
 package org.apache.hadoop.yarn.server;
 package org.apache.hadoop.yarn.server;
 
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.ArrayList;
@@ -183,7 +184,7 @@ import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
  * change the implementation with care.
  * change the implementation with care.
  */
  */
 public class MockResourceManagerFacade implements ApplicationClientProtocol,
 public class MockResourceManagerFacade implements ApplicationClientProtocol,
-    ApplicationMasterProtocol, ResourceManagerAdministrationProtocol {
+    ApplicationMasterProtocol, ResourceManagerAdministrationProtocol, Closeable {
 
 
   private static final Logger LOG =
   private static final Logger LOG =
       LoggerFactory.getLogger(MockResourceManagerFacade.class);
       LoggerFactory.getLogger(MockResourceManagerFacade.class);
@@ -967,4 +968,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
   public HashMap<ApplicationId, List<ContainerId>> getApplicationContainerIdMap() {
   public HashMap<ApplicationId, List<ContainerId>> getApplicationContainerIdMap() {
     return applicationContainerIdMap;
     return applicationContainerIdMap;
   }
   }
+
+  @Override
+  public void close() throws IOException {
+    LOG.info("MockResourceManagerFacade Close.");
+  }
 }
 }

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java

@@ -251,6 +251,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
   // the maximum wait time for the first async heart beat response
   // the maximum wait time for the first async heart beat response
   private long heartbeatMaxWaitTimeMs;
   private long heartbeatMaxWaitTimeMs;
 
 
+  private boolean waitUamRegisterDone;
+
   private MonotonicClock clock = new MonotonicClock();
   private MonotonicClock clock = new MonotonicClock();
 
 
   /**
   /**
@@ -353,6 +355,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       this.subClusterTimeOut =
       this.subClusterTimeOut =
           YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT;
           YarnConfiguration.DEFAULT_FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT;
     }
     }
+    this.waitUamRegisterDone = conf.getBoolean(YarnConfiguration.AMRM_PROXY_WAIT_UAM_REGISTER_DONE,
+        YarnConfiguration.DEFAULT_AMRM_PROXY_WAIT_UAM_REGISTER_DONE);
   }
   }
 
 
   @Override
   @Override
@@ -1332,6 +1336,18 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
       });
       });
       this.uamRegisterFutures.put(scId, future);
       this.uamRegisterFutures.put(scId, future);
     }
     }
+
+    if (this.waitUamRegisterDone) {
+      for (Map.Entry<SubClusterId, Future<?>> entry : this.uamRegisterFutures.entrySet()) {
+        SubClusterId subClusterId = entry.getKey();
+        Future<?> future = entry.getValue();
+        while (!future.isDone()) {
+          LOG.info("subClusterId {} Wait Uam Register done.", subClusterId);
+        }
+      }
+    }
+
+
     return newSubClusters;
     return newSubClusters;
   }
   }
 
 

+ 13 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java

@@ -38,6 +38,7 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.registry.client.api.RegistryOperations;
 import org.apache.hadoop.registry.client.api.RegistryOperations;
 import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
 import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -178,6 +179,9 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
     conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
     conf.setLong(YarnConfiguration.FEDERATION_AMRMPROXY_SUBCLUSTER_TIMEOUT,
         500);
         500);
 
 
+    // Wait UAM Register Down
+    conf.setBoolean(YarnConfiguration.AMRM_PROXY_WAIT_UAM_REGISTER_DONE, true);
+
     return conf;
     return conf;
   }
   }
 
 
@@ -593,6 +597,10 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
         interceptor.recover(recoveredDataMap);
         interceptor.recover(recoveredDataMap);
 
 
         Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
         Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+
+        // Waiting for SC-1 to time out.
+        GenericTestUtils.waitFor(() -> interceptor.getTimedOutSCs(true).size() == 1, 100, 1000);
+
         // SC1 should be initialized to be timed out
         // SC1 should be initialized to be timed out
         Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
         Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
 
 
@@ -851,7 +859,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
         List<Container> containers =
         List<Container> containers =
             getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
             getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
         for (Container c : containers) {
         for (Container c : containers) {
-          LOG.info("Allocated container " + c.getId());
+          LOG.info("Allocated container {}", c.getId());
         }
         }
         Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
         Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
 
 
@@ -885,6 +893,10 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
         int numberOfContainers = 3;
         int numberOfContainers = 3;
         // Should re-attach secondaries and get the three running containers
         // Should re-attach secondaries and get the three running containers
         Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
         Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+
+        // Waiting for SC-1 to time out.
+        GenericTestUtils.waitFor(() -> interceptor.getTimedOutSCs(true).size() == 1, 100, 1000);
+
         // SC1 should be initialized to be timed out
         // SC1 should be initialized to be timed out
         Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
         Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
         Assert.assertEquals(numberOfContainers,
         Assert.assertEquals(numberOfContainers,