Ver código fonte

YARN-11241. Add uncleaning option for local app log file with log-aggregation enabled (#4703)

Co-authored-by: Ashutosh Gupta <ashugpt@amazon.com>
Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
(cherry picked from commit 65a027b11299ac2d57556406a614442d8fc9acd4)
Ashutosh Gupta 2 anos atrás
pai
commit
2532eca013

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -1514,6 +1514,13 @@ public class YarnConfiguration extends Configuration {
   public static final long DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS
       = 10 * 60 * 1000;
 
+  /**
+   * Whether to clean up nodemanager logs when log aggregation is enabled.
+   */
+  public static final String LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP =
+      YARN_PREFIX + "log-aggregation.enable-local-cleanup";
+  public static final boolean DEFAULT_LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP = true;
+
   /**
    * Number of seconds to retain logs on the NodeManager. Only applicable if Log
    * aggregation is disabled

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -1516,6 +1516,15 @@
     <value>600000</value>
   </property>
 
+  <property>
+    <description>Whether to clean up nodemanager logs when log aggregation is enabled. Setting to
+      false disables the cleanup nodemanager logging, and it causes disk full in the long run. Users
+      can set to false for test-only purpose.
+    </description>
+    <name>yarn.log-aggregation.enable-local-cleanup</name>
+    <value>true</value>
+  </property>
+
   <property>
     <description>Time in seconds to retain user logs. Only applicable if
     log aggregation is disabled

+ 27 - 16
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java

@@ -86,6 +86,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
   private final Dispatcher dispatcher;
   private final ApplicationId appId;
   private final String applicationId;
+  private final boolean enableLocalCleanup;
   private boolean logAggregationDisabled = false;
   private final Configuration conf;
   private final DeletionService delService;
@@ -172,6 +173,13 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     this.logAggregationContext = logAggregationContext;
     this.context = context;
     this.nodeId = nodeId;
+    this.enableLocalCleanup =
+        conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP,
+            YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP);
+    if (!this.enableLocalCleanup) {
+      LOG.warn("{} is only for testing and not for any production system ",
+          YarnConfiguration.LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP);
+    }
     this.logAggPolicy = getLogAggPolicy(conf);
     this.recoveredLogInitedTime = recoveredLogInitedTime;
     this.logFileSizeThreshold =
@@ -337,26 +345,26 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
             appFinished, finishedContainers.contains(container));
         if (uploadedFilePathsInThisCycle.size() > 0) {
           uploadedLogsInThisCycle = true;
-          LOG.trace("Uploaded the following files for {}: {}",
-              container, uploadedFilePathsInThisCycle.toString());
-          List<Path> uploadedFilePathsInThisCycleList = new ArrayList<>();
-          uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle);
-          if (LOG.isDebugEnabled()) {
-            for (Path uploadedFilePath : uploadedFilePathsInThisCycleList) {
-              try {
-                long fileSize = lfs.getFileStatus(uploadedFilePath).getLen();
-                if (fileSize >= logFileSizeThreshold) {
-                  LOG.debug("Log File " + uploadedFilePath
-                      + " size is " + fileSize + " bytes");
+          if (enableLocalCleanup) {
+            LOG.trace("Uploaded the following files for {}: {}", container,
+                uploadedFilePathsInThisCycle.toString());
+            List<Path> uploadedFilePathsInThisCycleList = new ArrayList<>();
+            uploadedFilePathsInThisCycleList.addAll(uploadedFilePathsInThisCycle);
+            if (LOG.isDebugEnabled()) {
+              for (Path uploadedFilePath : uploadedFilePathsInThisCycleList) {
+                try {
+                  long fileSize = lfs.getFileStatus(uploadedFilePath).getLen();
+                  if (fileSize >= logFileSizeThreshold) {
+                    LOG.debug("Log File " + uploadedFilePath + " size is " + fileSize + " bytes");
+                  }
+                } catch (Exception e1) {
+                  LOG.error("Failed to get log file size " + e1);
                 }
-              } catch (Exception e1) {
-                LOG.error("Failed to get log file size " + e1);
               }
             }
+            deletionTask = new FileDeletionTask(delService, this.userUgi.getShortUserName(), null,
+                uploadedFilePathsInThisCycleList);
           }
-          deletionTask = new FileDeletionTask(delService,
-              this.userUgi.getShortUserName(), null,
-              uploadedFilePathsInThisCycleList);
         }
 
         // This container is finished, and all its logs have been uploaded,
@@ -528,6 +536,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
   }
 
   private void doAppLogAggregationPostCleanUp() {
+    if (!enableLocalCleanup) {
+      return;
+    }
     // Remove the local app-log-dirs
     List<Path> localAppLogDirs = new ArrayList<Path>();
     for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) {

+ 53 - 23
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java

@@ -234,31 +234,47 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     // ensure filesystems were closed
     verify(logAggregationService).closeFileSystems(
         any(UserGroupInformation.class));
-    List<Path> dirList = new ArrayList<>();
-    dirList.add(new Path(app1LogDir.toURI()));
-    verify(delSrvc, times(2)).delete(argThat(new FileDeletionMatcher(
-        delSrvc, user, null, dirList)));
-    
-    String containerIdStr = container11.toString();
-    File containerLogDir = new File(app1LogDir, containerIdStr);
-    int count = 0;
-    int maxAttempts = 50;
-    for (String fileType : new String[] { "stdout", "stderr", "syslog" }) {
-      File f = new File(containerLogDir, fileType);
-      count = 0;
-      while ((f.exists()) && (count < maxAttempts)) {
-        count++;
-        Thread.sleep(100);
+    boolean filesShouldBeDeleted =
+        this.conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP,
+            YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP);
+    if (filesShouldBeDeleted) {
+      List<Path> dirList = new ArrayList<>();
+      dirList.add(new Path(app1LogDir.toURI()));
+      verify(delSrvc, times(2)).delete(argThat(new FileDeletionMatcher(
+          delSrvc, user, null, dirList)));
+
+      String containerIdStr = container11.toString();
+      File containerLogDir = new File(app1LogDir, containerIdStr);
+      int count = 0;
+      int maxAttempts = 50;
+      for (String fileType : new String[]{"stdout", "stderr", "syslog"}) {
+        File f = new File(containerLogDir, fileType);
+        count = 0;
+        while ((f.exists()) && (count < maxAttempts)) {
+          count++;
+          Thread.sleep(100);
+        }
+        Assert.assertFalse("File [" + f + "] was not deleted", f.exists());
       }
-      Assert.assertFalse("File [" + f + "] was not deleted", f.exists());
-    }
-    count = 0;
-    while ((app1LogDir.exists()) && (count < maxAttempts)) {
-      count++;
-      Thread.sleep(100);
+      Assert.assertFalse("Directory [" + app1LogDir + "] was not deleted",
+          app1LogDir.exists());
+    } else {
+      List<Path> dirList = new ArrayList<>();
+      dirList.add(new Path(app1LogDir.toURI()));
+      verify(delSrvc, never()).delete(argThat(new FileDeletionMatcher(
+          delSrvc, user, null, dirList)));
+
+      String containerIdStr = container11.toString();
+      File containerLogDir = new File(app1LogDir, containerIdStr);
+      Thread.sleep(5000);
+      for (String fileType : new String[]{"stdout", "stderr", "syslog"}) {
+        File f = new File(containerLogDir, fileType);
+        Assert.assertTrue("File [" + f + "] was not deleted", f.exists());
+      }
+      Assert.assertTrue("Directory [" + app1LogDir + "] was not deleted",
+          app1LogDir.exists());
     }
-    Assert.assertFalse("Directory [" + app1LogDir + "] was not deleted",
-      app1LogDir.exists());
+    delSrvc.stop();
 
     Path logFilePath = logAggregationService
         .getLogAggregationFileController(conf)
@@ -297,6 +313,20 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     verifyLocalFileDeletion(logAggregationService);
   }
 
+  @Test
+  public void testLocalFileRemainsAfterUploadOnCleanupDisable() throws Exception {
+    this.delSrvc = new DeletionService(createContainerExecutor());
+    delSrvc = spy(delSrvc);
+    this.delSrvc.init(conf);
+    this.conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLE_LOCAL_CLEANUP, false);
+    this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
+    this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+        this.remoteRootLogDir.getAbsolutePath());
+    LogAggregationService logAggregationService = spy(
+        new LogAggregationService(dispatcher, this.context, this.delSrvc, super.dirsHandler));
+    verifyLocalFileDeletion(logAggregationService);
+  }
+
   @Test
   public void testLocalFileDeletionOnDiskFull() throws Exception {
     this.delSrvc = new DeletionService(createContainerExecutor());