1
0
فهرست منبع

YARN-2409. RM ActiveToStandBy transition missing stoping previous rmDispatcher. Contributed by Rohith

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1618915 13f79535-47bb-0310-9956-ffa450edef68
Jian He 11 سال پیش
والد
کامیت
375c221960

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -211,6 +211,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2397. Avoided loading two authentication filters for RM and TS web
     YARN-2397. Avoided loading two authentication filters for RM and TS web
     interfaces. (Varun Vasudev via zjshen)
     interfaces. (Varun Vasudev via zjshen)
 
 
+    YARN-2409. RM ActiveToStandBy transition missing stoping previous rmDispatcher.
+    (Rohith via jianhe)
+
 Release 2.5.0 - UNRELEASED
 Release 2.5.0 - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -1161,6 +1161,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
     ((Service)dispatcher).init(this.conf);
     ((Service)dispatcher).init(this.conf);
     ((Service)dispatcher).start();
     ((Service)dispatcher).start();
     removeService((Service)rmDispatcher);
     removeService((Service)rmDispatcher);
+    // Need to stop previous rmDispatcher before assigning new dispatcher
+    // otherwise causes "AsyncDispatcher event handler" thread leak
+    ((Service) rmDispatcher).stop();
     rmDispatcher = dispatcher;
     rmDispatcher = dispatcher;
     addIfService(rmDispatcher);
     addIfService(rmDispatcher);
     rmContext.setDispatcher(rmDispatcher);
     rmContext.setDispatcher(rmDispatcher);

+ 23 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java

@@ -331,6 +331,10 @@ public class TestRMHA {
     rm.adminService.transitionToStandby(requestInfo);
     rm.adminService.transitionToStandby(requestInfo);
     rm.adminService.transitionToActive(requestInfo);
     rm.adminService.transitionToActive(requestInfo);
     rm.adminService.transitionToStandby(requestInfo);
     rm.adminService.transitionToStandby(requestInfo);
+    
+    MyCountingDispatcher dispatcher =
+        (MyCountingDispatcher) rm.getRMContext().getDispatcher();
+    assertTrue(!dispatcher.isStopped());
 
 
     rm.adminService.transitionToActive(requestInfo);
     rm.adminService.transitionToActive(requestInfo);
     assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
     assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
@@ -339,6 +343,11 @@ public class TestRMHA {
     assertEquals(errorMessageForService, expectedServiceCount,
     assertEquals(errorMessageForService, expectedServiceCount,
         rm.getServices().size());
         rm.getServices().size());
 
 
+    
+    // Keep the dispatcher reference before transitioning to standby
+    dispatcher = (MyCountingDispatcher) rm.getRMContext().getDispatcher();
+    
+    
     rm.adminService.transitionToStandby(requestInfo);
     rm.adminService.transitionToStandby(requestInfo);
     assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
     assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
         ((MyCountingDispatcher) rm.getRMContext().getDispatcher())
         ((MyCountingDispatcher) rm.getRMContext().getDispatcher())
@@ -346,6 +355,8 @@ public class TestRMHA {
     assertEquals(errorMessageForService, expectedServiceCount,
     assertEquals(errorMessageForService, expectedServiceCount,
         rm.getServices().size());
         rm.getServices().size());
 
 
+    assertTrue(dispatcher.isStopped());
+    
     rm.stop();
     rm.stop();
   }
   }
 
 
@@ -492,6 +503,8 @@ public class TestRMHA {
 
 
     private int eventHandlerCount;
     private int eventHandlerCount;
 
 
+    private volatile boolean stopped = false;
+
     public MyCountingDispatcher() {
     public MyCountingDispatcher() {
       super("MyCountingDispatcher");
       super("MyCountingDispatcher");
       this.eventHandlerCount = 0;
       this.eventHandlerCount = 0;
@@ -510,5 +523,15 @@ public class TestRMHA {
     public int getEventHandlerCount() {
     public int getEventHandlerCount() {
       return this.eventHandlerCount;
       return this.eventHandlerCount;
     }
     }
+
+    @Override
+    protected void serviceStop() throws Exception {
+      this.stopped = true;
+      super.serviceStop();
+    }
+
+    public boolean isStopped() {
+      return this.stopped;
+    }
   }
   }
 }
 }