소스 검색

YARN-3850. NM fails to read files from full disks which can lead to container logs being lost and other issues. Contributed by Varun Saxena
(cherry picked from commit 40b256949ad6f6e0dbdd248f2d257b05899f4332)

Jason Lowe 10 년 전
부모
커밋
baf9e22284

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -169,6 +169,9 @@ Release 2.7.1 - UNRELEASED
     YARN-3832. Resource Localization fails on a cluster due to existing cache
     directories (Brahma Reddy Battula via jlowe)
 
+    YARN-3850. NM fails to read files from full disks which can lead to
+    container logs being lost and other issues (Varun Saxena via jlowe)
+
 Release 2.7.0 - 2015-04-20
 
   INCOMPATIBLE CHANGES

+ 24 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java

@@ -212,6 +212,18 @@ public class LocalDirsHandlerService extends AbstractService {
     return logDirs.getFullDirs();
   }
 
+  /**
+   * Function to get the local dirs which should be considered for reading
+   * existing files on disk. Contains the good local dirs and the local dirs
+   * that have reached the disk space limit
+   *
+   * @return the local dirs which should be considered for reading
+   */
+  public List<String> getLocalDirsForRead() {
+    return DirectoryCollection.concat(localDirs.getGoodDirs(),
+        localDirs.getFullDirs());
+  }
+
   /**
    * Function to get the local dirs which should be considered when cleaning up
    * resources. Contains the good local dirs and the local dirs that have reached
@@ -224,6 +236,18 @@ public class LocalDirsHandlerService extends AbstractService {
         localDirs.getFullDirs());
   }
 
+  /**
+   * Function to get the log dirs which should be considered for reading
+   * existing files on disk. Contains the good log dirs and the log dirs that
+   * have reached the disk space limit
+   *
+   * @return the log dirs which should be considered for reading
+   */
+  public List<String> getLogDirsForRead() {
+    return DirectoryCollection.concat(logDirs.getGoodDirs(),
+        logDirs.getFullDirs());
+  }
+
   /**
    * Function to get the log dirs which should be considered when cleaning up
    * resources. Contains the good log dirs and the log dirs that have reached

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java

@@ -121,7 +121,8 @@ public class RecoveredContainerLaunch extends ContainerLaunch {
 
   private File locatePidFile(String appIdStr, String containerIdStr) {
     String pidSubpath= getPidFileSubpath(appIdStr, containerIdStr);
-    for (String dir : getContext().getLocalDirsHandler().getLocalDirs()) {
+    for (String dir : getContext().getLocalDirsHandler().
+        getLocalDirsForRead()) {
       File pidFile = new File(dir, pidSubpath);
       if (pidFile.exists()) {
         return pidFile;

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java

@@ -554,10 +554,10 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
         boolean appFinished) {
       LOG.info("Uploading logs for container " + containerId
           + ". Current good log dirs are "
-          + StringUtils.join(",", dirsHandler.getLogDirs()));
+          + StringUtils.join(",", dirsHandler.getLogDirsForRead()));
       final LogKey logKey = new LogKey(containerId);
       final LogValue logValue =
-          new LogValue(dirsHandler.getLogDirs(), containerId,
+          new LogValue(dirsHandler.getLogDirsForRead(), containerId,
             userUgi.getShortUserName(), logAggregationContext,
             this.uploadedFileMeta, appFinished);
       try {

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsUtils.java

@@ -74,7 +74,7 @@ public class ContainerLogsUtils {
   
   static List<File> getContainerLogDirs(ContainerId containerId,
       LocalDirsHandlerService dirsHandler) throws YarnException {
-    List<String> logDirs = dirsHandler.getLogDirs();
+    List<String> logDirs = dirsHandler.getLogDirsForRead();
     List<File> containerLogDirs = new ArrayList<File>(logDirs.size());
     for (String logDir : logDirs) {
       logDir = new File(logDir).toURI().getPath();

+ 40 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java

@@ -172,22 +172,11 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     dispatcher.close();
   }
 
-  @Test
-  public void testLocalFileDeletionAfterUpload() throws Exception {
-    this.delSrvc = new DeletionService(createContainerExecutor());
-    delSrvc = spy(delSrvc);
-    this.delSrvc.init(conf);
-    this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
-    this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
-        this.remoteRootLogDir.getAbsolutePath());
-    
-    LogAggregationService logAggregationService = spy(
-        new LogAggregationService(dispatcher, this.context, this.delSrvc,
-                                  super.dirsHandler));
+  private void verifyLocalFileDeletion(
+      LogAggregationService logAggregationService) throws Exception {
     logAggregationService.init(this.conf);
     logAggregationService.start();
 
-    
     ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
 
     // AppLogDir should be created
@@ -247,9 +236,46 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
             ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)
     };
 
-    checkEvents(appEventHandler, expectedEvents, true, "getType", "getApplicationID");
+    checkEvents(appEventHandler, expectedEvents, true, "getType",
+        "getApplicationID");
   }
 
+  @Test
+  public void testLocalFileDeletionAfterUpload() throws Exception {
+    this.delSrvc = new DeletionService(createContainerExecutor());
+    delSrvc = spy(delSrvc);
+    this.delSrvc.init(conf);
+    this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
+    this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+        this.remoteRootLogDir.getAbsolutePath());
+
+    LogAggregationService logAggregationService = spy(
+        new LogAggregationService(dispatcher, this.context, this.delSrvc,
+                                  super.dirsHandler));
+    verifyLocalFileDeletion(logAggregationService);
+  }
+
+  @Test
+  public void testLocalFileDeletionOnDiskFull() throws Exception {
+    this.delSrvc = new DeletionService(createContainerExecutor());
+    delSrvc = spy(delSrvc);
+    this.delSrvc.init(conf);
+    this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
+    this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+        this.remoteRootLogDir.getAbsolutePath());
+    List<String> logDirs = super.dirsHandler.getLogDirs();
+    LocalDirsHandlerService dirsHandler = spy(super.dirsHandler);
+    // Simulate disk being full by returning no good log dirs but having a
+    // directory in full log dirs.
+    when(dirsHandler.getLogDirs()).thenReturn(new ArrayList<String>());
+    when(dirsHandler.getLogDirsForRead()).thenReturn(logDirs);
+    LogAggregationService logAggregationService = spy(
+        new LogAggregationService(dispatcher, this.context, this.delSrvc,
+            dirsHandler));
+    verifyLocalFileDeletion(logAggregationService);
+  }
+
+
   @Test
   public void testNoContainerOnNode() throws Exception {
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());

+ 21 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.webapp;
 
 import static org.junit.Assume.assumeTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.verify;
 
@@ -29,6 +30,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -115,6 +117,24 @@ public class TestContainerLogsPage {
     Assert.assertNull(nmContext.getContainers().get(container1));
     files = ContainerLogsUtils.getContainerLogDirs(container1, user, nmContext);
     Assert.assertTrue(!(files.get(0).toString().contains("file:")));
+
+    // Create a new context to check if correct container log dirs are fetched
+    // on full disk.
+    LocalDirsHandlerService dirsHandlerForFullDisk = spy(dirsHandler);
+    // good log dirs are empty and nm log dir is in the full log dir list.
+    when(dirsHandlerForFullDisk.getLogDirs()).
+        thenReturn(new ArrayList<String>());
+    when(dirsHandlerForFullDisk.getLogDirsForRead()).
+        thenReturn(Arrays.asList(new String[] {absLogDir.getAbsolutePath()}));
+    nmContext = new NodeManager.NMContext(null, null, dirsHandlerForFullDisk,
+        new ApplicationACLsManager(conf), new NMNullStateStoreService());
+    nmContext.getApplications().put(appId, app);
+    container.setState(ContainerState.RUNNING);
+    nmContext.getContainers().put(container1, container);
+    List<File> dirs =
+        ContainerLogsUtils.getContainerLogDirs(container1, user, nmContext);
+    File containerLogDir = new File(absLogDir, appId + "/" + container1);
+    Assert.assertTrue(dirs.contains(containerLogDir));
   }
   
   @Test(timeout = 10000)
@@ -224,7 +244,7 @@ public class TestContainerLogsPage {
     LocalDirsHandlerService localDirs = mock(LocalDirsHandlerService.class);
     List<String> logDirs = new ArrayList<String>();
     logDirs.add("F:/nmlogs");
-    when(localDirs.getLogDirs()).thenReturn(logDirs);
+    when(localDirs.getLogDirsForRead()).thenReturn(logDirs);
     
     ApplicationIdPBImpl appId = mock(ApplicationIdPBImpl.class);
     when(appId.toString()).thenReturn("app_id_1");