Browse Source

YARN-8451. Multiple NM heartbeat thread created when a slow NM resync with RM. Contributed by Botong Huang

(cherry picked from commit 100470140d86eede0fa240a9aa93226f274ee4f5)
Jason Lowe 7 years ago
parent
commit
0317961ec4

+ 42 - 24
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

@@ -128,6 +128,7 @@ public class NodeManager extends CompositeService
   // the NM collector service is set only if the timeline service v.2 is enabled
   private NMCollectorService nmCollectorService;
   private NodeStatusUpdater nodeStatusUpdater;
+  private AtomicBoolean resyncingWithRM = new AtomicBoolean(false);
   private NodeResourceMonitor nodeResourceMonitor;
   private static CompositeServiceShutdownHook nodeManagerShutdownHook;
   private NMStateStoreService nmStore = null;
@@ -371,7 +372,7 @@ public class NodeManager extends CompositeService
     addService(del);
 
     // NodeManager level dispatcher
-    this.dispatcher = new AsyncDispatcher("NM Event dispatcher");
+    this.dispatcher = createNMDispatcher();
 
     dirsHandler = new LocalDirsHandlerService(metrics);
     nodeHealthChecker =
@@ -492,31 +493,41 @@ public class NodeManager extends CompositeService
   }
 
   protected void resyncWithRM() {
-    //we do not want to block dispatcher thread here
-    new Thread() {
-      @Override
-      public void run() {
-        try {
-          if (!rmWorkPreservingRestartEnabled) {
-            LOG.info("Cleaning up running containers on resync");
-            containerManager.cleanupContainersOnNMResync();
-            // Clear all known collectors for resync.
-            if (context.getKnownCollectors() != null) {
-              context.getKnownCollectors().clear();
+    // Create a thread for resync because we do not want to block dispatcher
+    // thread here. Also use locking to make sure only one thread is running at
+    // a time.
+    if (this.resyncingWithRM.getAndSet(true)) {
+      // Some other thread is already created for resyncing, do nothing
+    } else {
+      // We have got the lock, create a new thread
+      new Thread() {
+        @Override
+        public void run() {
+          try {
+            if (!rmWorkPreservingRestartEnabled) {
+              LOG.info("Cleaning up running containers on resync");
+              containerManager.cleanupContainersOnNMResync();
+              // Clear all known collectors for resync.
+              if (context.getKnownCollectors() != null) {
+                context.getKnownCollectors().clear();
+              }
+            } else {
+              LOG.info("Preserving containers on resync");
+              // Re-register known timeline collectors.
+              reregisterCollectors();
             }
-          } else {
-            LOG.info("Preserving containers on resync");
-            // Re-register known timeline collectors.
-            reregisterCollectors();
+            ((NodeStatusUpdaterImpl) nodeStatusUpdater)
+                .rebootNodeStatusUpdaterAndRegisterWithRM();
+          } catch (YarnRuntimeException e) {
+            LOG.error("Error while rebooting NodeStatusUpdater.", e);
+            shutDown(NodeManagerStatus.EXCEPTION.getExitCode());
+          } finally {
+            // Release lock
+            resyncingWithRM.set(false);
           }
-          ((NodeStatusUpdaterImpl) nodeStatusUpdater)
-            .rebootNodeStatusUpdaterAndRegisterWithRM();
-        } catch (YarnRuntimeException e) {
-          LOG.error("Error while rebooting NodeStatusUpdater.", e);
-          shutDown(NodeManagerStatus.EXCEPTION.getExitCode());
         }
-      }
-    }.start();
+      }.start();
+    }
   }
 
   /**
@@ -864,7 +875,14 @@ public class NodeManager extends CompositeService
   ContainerManagerImpl getContainerManager() {
     return containerManager;
   }
-  
+
+  /**
+   * Unit test friendly.
+   */
+  protected AsyncDispatcher createNMDispatcher() {
+    return new AsyncDispatcher("NM Event dispatcher");
+  }
+
   //For testing
   Dispatcher getNMDispatcher(){
     return dispatcher;

+ 56 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java

@@ -37,6 +37,7 @@ import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
@@ -64,7 +65,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -107,6 +110,7 @@ public class TestNodeManagerResync {
   private FileContext localFS;
   private CyclicBarrier syncBarrier;
   private CyclicBarrier updateBarrier;
+  private AtomicInteger resyncThreadCount;
   private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
   private AtomicBoolean isNMShutdownCalled = new AtomicBoolean(false);
   private final NodeManagerEvent resyncEvent =
@@ -125,6 +129,7 @@ public class TestNodeManagerResync {
     nmLocalDir.mkdirs();
     syncBarrier = new CyclicBarrier(2);
     updateBarrier = new CyclicBarrier(2);
+    resyncThreadCount = new AtomicInteger(0);
   }
 
   @After
@@ -185,6 +190,41 @@ public class TestNodeManagerResync {
     }
   }
 
+  @SuppressWarnings("resource")
+  @Test(timeout = 30000)
+  public void testNMMultipleResyncEvent()
+      throws IOException, InterruptedException {
+    TestNodeManager1 nm = new TestNodeManager1(false);
+    YarnConfiguration conf = createNMConfig();
+
+    int resyncEventCount = 4;
+    try {
+      nm.init(conf);
+      nm.start();
+      Assert.assertEquals(1, nm.getNMRegistrationCount());
+      for (int i = 0; i < resyncEventCount; i++) {
+        nm.getNMDispatcher().getEventHandler().handle(resyncEvent);
+      }
+
+      DrainDispatcher dispatcher = (DrainDispatcher) nm.getNMDispatcher();
+      dispatcher.await();
+      LOG.info("NM dispatcher drained");
+
+      // Wait for the resync thread to finish
+      try {
+        syncBarrier.await();
+      } catch (BrokenBarrierException e) {
+      }
+      LOG.info("Barrier wait done for the resync thread");
+
+      // Resync should only happen once
+      Assert.assertEquals(2, nm.getNMRegistrationCount());
+      Assert.assertFalse("NM shutdown called.", isNMShutdownCalled.get());
+    } finally {
+      nm.stop();
+    }
+  }
+
   @SuppressWarnings("resource")
   @Test(timeout=10000)
   public void testNMshutdownWhenResyncThrowException() throws IOException,
@@ -399,6 +439,11 @@ public class TestNodeManagerResync {
       existingCid = cId;
     }
 
+    @Override
+    protected AsyncDispatcher createNMDispatcher() {
+      return new DrainDispatcher();
+    }
+
     @Override
     protected NodeStatusUpdater createNodeStatusUpdater(Context context,
         Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
@@ -410,6 +455,14 @@ public class TestNodeManagerResync {
       return registrationCount;
     }
 
+    @Override
+    protected void shutDown(int exitCode) {
+      synchronized (isNMShutdownCalled) {
+        isNMShutdownCalled.set(true);
+        isNMShutdownCalled.notify();
+      }
+    }
+
     class TestNodeStatusUpdaterImpl1 extends MockNodeStatusUpdater {
 
       public TestNodeStatusUpdaterImpl1(Context context, Dispatcher dispatcher,
@@ -428,6 +481,9 @@ public class TestNodeManagerResync {
         ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager
         .containermanager.container.Container> containers =
             getNMContext().getContainers();
+        if (resyncThreadCount.incrementAndGet() > 1) {
+          throw new YarnRuntimeException("Multiple resync thread created!");
+        }
         try {
           try {
             if (containersShouldBePreserved) {