Selaa lähdekoodia

YARN-4756. Unnecessary wait in Node Status Updater during reboot. (Eric Badger via kasha)

Karthik Kambatla 9 vuotta sitten
vanhempi
commit
e82f961a39

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -284,6 +284,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
         return;
       }
       this.isStopped = true;
+      sendOutofBandHeartBeat();
       try {
         statusUpdater.join();
         registerWithRM();

+ 22 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java

@@ -108,6 +108,7 @@ public class TestNodeManagerResync {
   static final String user = "nobody";
   private FileContext localFS;
   private CyclicBarrier syncBarrier;
+  private CyclicBarrier updateBarrier;
   private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
   private AtomicBoolean isNMShutdownCalled = new AtomicBoolean(false);
   private final NodeManagerEvent resyncEvent =
@@ -125,6 +126,7 @@ public class TestNodeManagerResync {
     remoteLogsDir.mkdirs();
     nmLocalDir.mkdirs();
     syncBarrier = new CyclicBarrier(2);
+    updateBarrier = new CyclicBarrier(2);
   }
 
   @After
@@ -803,9 +805,11 @@ public class TestNodeManagerResync {
                 .getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
             assertEquals(Resource.newInstance(1024, 1),
                 containerStatus.getCapability());
+            updateBarrier.await();
             // Call the actual rebootNodeStatusUpdaterAndRegisterWithRM().
             // This function should be synchronized with
             // increaseContainersResource().
+            updateBarrier.await();
             super.rebootNodeStatusUpdaterAndRegisterWithRM();
             // Check status after registerWithRM
             containerStatus = getContainerManager()
@@ -831,17 +835,24 @@ public class TestNodeManagerResync {
         List<Token> increaseTokens = new ArrayList<Token>();
         // Add increase request.
         Resource targetResource = Resource.newInstance(4096, 2);
-        try {
-          increaseTokens.add(getContainerToken(targetResource));
-          IncreaseContainersResourceRequest increaseRequest =
-              IncreaseContainersResourceRequest.newInstance(increaseTokens);
-          IncreaseContainersResourceResponse increaseResponse =
-              getContainerManager()
-                  .increaseContainersResource(increaseRequest);
-          Assert.assertEquals(
-              1, increaseResponse.getSuccessfullyIncreasedContainers()
-                  .size());
-          Assert.assertTrue(increaseResponse.getFailedRequests().isEmpty());
+        try{
+          try {
+            updateBarrier.await();
+            increaseTokens.add(getContainerToken(targetResource));
+            IncreaseContainersResourceRequest increaseRequest =
+                IncreaseContainersResourceRequest.newInstance(increaseTokens);
+            IncreaseContainersResourceResponse increaseResponse =
+                getContainerManager()
+                    .increaseContainersResource(increaseRequest);
+            Assert.assertEquals(
+                1, increaseResponse.getSuccessfullyIncreasedContainers()
+                    .size());
+            Assert.assertTrue(increaseResponse.getFailedRequests().isEmpty());
+          } catch (Exception e) {
+            e.printStackTrace();
+          } finally {
+            updateBarrier.await();
+          }
         } catch (Exception e) {
           e.printStackTrace();
         }