浏览代码

svn merge -c 1297796 from trunk to branch-0.23.2 FIXES: MAPREDUCE-3977. LogAggregationService leaks log aggregator objects (Jason Lowe via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23.2@1297798 13f79535-47bb-0310-9956-ffa450edef68
Robert Joseph Evans 13 年之前
父节点
当前提交
48df13b305

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -154,6 +154,9 @@ Release 0.23.2 - UNRELEASED
     MAPREDUCE-3961. Map/ReduceSlotMillis computation incorrect (Siddharth Seth
     via bobby)
 
+    MAPREDUCE-3977. LogAggregationService leaks log aggregator objects
+    (Jason Lowe via bobby)
+
 Release 0.23.1 - 2012-02-17
 
   INCOMPATIBLE CHANGES

+ 18 - 8
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java

@@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -139,12 +140,6 @@ public class LogAggregationService extends AbstractService implements
     super.stop();
   }
   
-  
-
-  
-
-
-  
   private void verifyAndCreateRemoteLogDir(Configuration conf) {
     // Checking the existance of the TLD
     FileSystem remoteFS = null;
@@ -289,7 +284,7 @@ public class LogAggregationService extends AbstractService implements
     createAppDir(user, appId, userUgi);
 
     // New application
-    AppLogAggregator appLogAggregator =
+    final AppLogAggregator appLogAggregator =
         new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
             getConfig(), appId, userUgi, dirsHandler,
             getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
@@ -303,7 +298,22 @@ public class LogAggregationService extends AbstractService implements
     // aggregation.
 
     // Schedule the aggregator.
-    this.threadPool.execute(appLogAggregator);
+    Runnable aggregatorWrapper = new Runnable() {
+      public void run() {
+        try {
+          appLogAggregator.run();
+        } finally {
+          appLogAggregators.remove(appId);
+        }
+      }
+    };
+    this.threadPool.execute(aggregatorWrapper);
+  }
+
+  // for testing only
+  @Private
+  int getNumAggregators() {
+    return this.appLogAggregators.size();
   }
 
   private void stopContainer(ContainerId containerId, int exitCode) {

+ 34 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java

@@ -565,4 +565,38 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
 
     logAggregationService.stop();
   }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testLogAggregatorCleanup() throws Exception {
+    DeletionService delSrvc = mock(DeletionService.class);
+
+    // get the AppLogAggregationImpl thread to crash
+    LocalDirsHandlerService mockedDirSvc = mock(LocalDirsHandlerService.class);
+
+    DrainDispatcher dispatcher = createDispatcher();
+    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, appEventHandler);
+
+    LogAggregationService logAggregationService =
+        new LogAggregationService(dispatcher, this.context, delSrvc,
+                                  mockedDirSvc);
+    logAggregationService.init(this.conf);
+    logAggregationService.start();
+
+    ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
+    logAggregationService.handle(new LogHandlerAppStartedEvent(
+            application1, this.user, null,
+            ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
+
+    logAggregationService.handle(new LogHandlerAppFinishedEvent(application1));
+    dispatcher.await();
+    int timeToWait = 20 * 1000;
+    while (timeToWait > 0 && logAggregationService.getNumAggregators() > 0) {
+      Thread.sleep(100);
+      timeToWait -= 100;
+    }
+    Assert.assertEquals("Log aggregator failed to cleanup!", 0,
+        logAggregationService.getNumAggregators());
+  }
 }