Преглед изворни кода

YARN-4096. App local logs are leaked if log aggregation fails to initialize for the app. Contributed by Jason Lowe.

(cherry picked from commit 16b9037dc1300b8bdbe54ba7cd47c53fe16e93d8)
Zhihai Xu пре 10 година
родитељ
комит
214338bd68

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -846,6 +846,9 @@ Release 2.7.2 - UNRELEASED
     YARN-4087. Followup fixes after YARN-2019 regarding RM behavior when
     state-store error occurs. (Jian He via xgong)
 
+    YARN-4096. App local logs are leaked if log aggregation fails to initialize
+    for the app. (Jason Lowe via zxu)
+
 Release 2.7.1 - 2015-07-06
 
   INCOMPATIBLE CHANGES

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java

@@ -27,4 +27,6 @@ public interface AppLogAggregator extends Runnable {
   void abortLogAggregation();
 
   void finishLogAggregation();
+
+  void disableLogAggregation();
 }

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java

@@ -596,6 +596,11 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     this.notifyAll();
   }
 
+  @Override
+  public void disableLogAggregation() {
+    this.logAggregationDisabled = true;
+  }
+
   @Private
   @VisibleForTesting
   // This is only used for testing.

+ 9 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java

@@ -363,19 +363,19 @@ public class LogAggregationService extends AbstractService implements
       throw new YarnRuntimeException("Duplicate initApp for " + appId);
     }
     // wait until check for existing aggregator to create dirs
+    YarnRuntimeException appDirException = null;
     try {
       // Create the app dir
       createAppDir(user, appId, userUgi);
     } catch (Exception e) {
-      appLogAggregators.remove(appId);
-      closeFileSystems(userUgi);
+      appLogAggregator.disableLogAggregation();
       if (!(e instanceof YarnRuntimeException)) {
-        e = new YarnRuntimeException(e);
+        appDirException = new YarnRuntimeException(e);
+      } else {
+        appDirException = (YarnRuntimeException)e;
       }
-      throw (YarnRuntimeException)e;
     }
 
-
     // TODO Get the user configuration for the list of containers that need log
     // aggregation.
 
@@ -391,6 +391,10 @@ public class LogAggregationService extends AbstractService implements
       }
     };
     this.threadPool.execute(aggregatorWrapper);
+
+    if (appDirException != null) {
+      throw appDirException;
+    }
   }
 
   protected void closeFileSystems(final UserGroupInformation userUgi) {

+ 12 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java

@@ -731,9 +731,10 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         this.remoteRootLogDir.getAbsolutePath());
-        
+
+    DeletionService spyDelSrvc = spy(this.delSrvc);
     LogAggregationService logAggregationService = spy(
-        new LogAggregationService(dispatcher, this.context, this.delSrvc,
+        new LogAggregationService(dispatcher, this.context, spyDelSrvc,
                                   super.dirsHandler));
     logAggregationService.init(this.conf);
     logAggregationService.start();
@@ -741,6 +742,11 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     ApplicationId appId =
         BuilderUtils.newApplicationId(System.currentTimeMillis(),
           (int) (Math.random() * 1000));
+
+    File appLogDir =
+        new File(localLogDir, ConverterUtils.toString(appId));
+    appLogDir.mkdir();
+
     Exception e = new RuntimeException("KABOOM!");
     doThrow(e)
       .when(logAggregationService).createAppDir(any(String.class),
@@ -759,9 +765,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     };
     checkEvents(appEventHandler, expectedEvents, false,
         "getType", "getApplicationID", "getDiagnostic");
-    // filesystems may have been instantiated
-    verify(logAggregationService).closeFileSystems(
-        any(UserGroupInformation.class));
 
     // verify trying to collect logs for containers/apps we don't know about
     // doesn't blow up and tear down the NM
@@ -774,6 +777,10 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
 
     logAggregationService.stop();
     assertEquals(0, logAggregationService.getNumAggregators());
+    verify(spyDelSrvc).delete(eq(user), any(Path.class),
+        Mockito.<Path>anyVararg());
+    verify(logAggregationService).closeFileSystems(
+        any(UserGroupInformation.class));
   }
 
   private void writeContainerLogs(File appLogDir, ContainerId containerId,