Ver código fonte

svn merge -c 1460808 FIXES: YARN-71. Fix the NodeManager to clean up local-dirs on restart. Contributed by Xuan Gong

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1469686 13f79535-47bb-0310-9956-ffa450edef68
Jason Darrell Lowe 12 anos atrás
pai
commit
d6dc577bb7

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -15,6 +15,9 @@ Release 0.23.8 - UNRELEASED
     YARN-476. ProcfsBasedProcessTree info message confuses users. 
     (sandyr via tucu)
 
+    YARN-71. Fix the NodeManager to clean up local-dirs on restart.
+    (Xuan Gong via sseth)
+
 Release 0.23.7 - 2013-04-18
 
   INCOMPATIBLE CHANGES

+ 12 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

@@ -58,6 +58,8 @@ import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
 import org.apache.hadoop.yarn.util.Records;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class NodeManager extends CompositeService 
     implements EventHandler<NodeManagerEvent> {
 
@@ -113,6 +115,10 @@ public class NodeManager extends CompositeService
     return new WebServer(nmContext, resourceView, aclsManager, dirsHandler);
   }
 
+  protected DeletionService createDeletionService(ContainerExecutor exec) {
+    return new DeletionService(exec);
+  }
+
   protected void doSecureLogin() throws IOException {
     SecurityUtil.login(getConfig(), YarnConfiguration.NM_KEYTAB,
         YarnConfiguration.NM_PRINCIPAL);
@@ -143,7 +149,7 @@ public class NodeManager extends CompositeService
     } catch (IOException e) {
       throw new YarnException("Failed to initialize container executor", e);
     }    
-    DeletionService del = new DeletionService(exec);
+    DeletionService del = createDeletionService(exec);
     addService(del);
 
     // NodeManager level dispatcher
@@ -351,6 +357,11 @@ public class NodeManager extends CompositeService
     return containerManager;
   }
 
+  @VisibleForTesting
+  Context getNMContext() {
+    return this.context;
+  }
+
   public static void main(String[] args) {
     Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
     StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);

+ 78 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java

@@ -22,6 +22,7 @@ import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
 
 import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
@@ -53,8 +54,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.security.Credentials;
@@ -175,9 +178,11 @@ public class ResourceLocalizationService extends CompositeService
     this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
 
     try {
-      // TODO queue deletions here, rather than NM init?
       FileContext lfs = getLocalFileContext(conf);
       lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK));
+
+      cleanUpLocalDir(lfs,delService);
+
       List<String> localDirs = dirsHandler.getLocalDirs();
       for (String localDir : localDirs) {
         // $local/usercache
@@ -926,4 +931,76 @@ public class ResourceLocalizationService extends CompositeService
 
   }
 
+  private void cleanUpLocalDir(FileContext lfs, DeletionService del) {
+    long currentTimeStamp = System.currentTimeMillis();
+    for (String localDir : dirsHandler.getLocalDirs()) {
+      renameLocalDir(lfs, localDir, ContainerLocalizer.USERCACHE,
+          currentTimeStamp);
+      renameLocalDir(lfs, localDir, ContainerLocalizer.FILECACHE,
+          currentTimeStamp);
+      renameLocalDir(lfs, localDir, ResourceLocalizationService.NM_PRIVATE_DIR,
+          currentTimeStamp);
+      try {
+        deleteLocalDir(lfs, del, localDir);
+      } catch (IOException e) {
+        // Do nothing, just give the warning
+        LOG.warn("Failed to delete localDir: " + localDir);
+      }
+    }
+  }
+
+  private void renameLocalDir(FileContext lfs, String localDir,
+      String localSubDir, long currentTimeStamp) {
+    try {
+      lfs.rename(new Path(localDir, localSubDir), new Path(
+          localDir, localSubDir + "_DEL_" + currentTimeStamp));
+    } catch (FileNotFoundException ex) {
+      // No need to handle this exception
+      // localSubDir may not be exist
+    } catch (Exception ex) {
+      // Do nothing, just give the warning
+      LOG.warn("Failed to rename the local file under " +
+          localDir + "/" + localSubDir);
+    }
+  }
+
+  private void deleteLocalDir(FileContext lfs, DeletionService del,
+      String localDir) throws IOException {
+    RemoteIterator<FileStatus> fileStatus = lfs.listStatus(new Path(localDir));
+    if (fileStatus != null) {
+      while (fileStatus.hasNext()) {
+        FileStatus status = fileStatus.next();
+        try {
+          if (status.getPath().getName().matches(".*" +
+              ContainerLocalizer.USERCACHE + "_DEL_.*")) {
+            cleanUpFilesFromSubDir(lfs, del, status.getPath());
+          } else if (status.getPath().getName()
+              .matches(".*" + NM_PRIVATE_DIR + "_DEL_.*")
+              ||
+              status.getPath().getName()
+                  .matches(".*" + ContainerLocalizer.FILECACHE + "_DEL_.*")) {
+            del.delete(null, status.getPath(), new Path[] {});
+          }
+        } catch (IOException ex) {
+          // Do nothing, just give the warning
+          LOG.warn("Failed to delete this local Directory: " +
+              status.getPath().getName());
+        }
+      }
+    }
+  }
+
+  private void cleanUpFilesFromSubDir(FileContext lfs, DeletionService del,
+      Path dirPath) throws IOException {
+    RemoteIterator<FileStatus> fileStatus = lfs.listStatus(dirPath);
+    if (fileStatus != null) {
+      while (fileStatus.hasNext()) {
+        FileStatus status = fileStatus.next();
+        String owner = status.getOwner();
+        del.delete(owner, status.getPath(), new Path[] {});
+      }
+    }
+    del.delete(null, dirPath, new Path[] {});
+  }
+
 }

+ 297 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java

@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+
+public class TestNodeManagerReboot {
+
+  static final File basedir =
+      new File("target", TestNodeManagerReboot.class.getName());
+  static final File logsDir = new File(basedir, "logs");
+  static final File nmLocalDir = new File(basedir, "nm0");
+  static final File localResourceDir = new File(basedir, "resource");
+
+  static final String user = System.getProperty("user.name");
+  private FileContext localFS;
+
+  private MyNodeManager nm;
+  private DeletionService delService;
+  static final Log LOG = LogFactory.getLog(TestNodeManagerReboot.class);
+
+  @Before
+  public void setup() throws UnsupportedFileSystemException {
+    localFS = FileContext.getLocalFSFileContext();
+  }
+
+  @After
+  public void tearDown() throws IOException, InterruptedException {
+    localFS.delete(new Path(basedir.getPath()), true);
+    if (nm != null) {
+      nm.stop();
+    }
+  }
+
+  @Test(timeout = 20000)
+  public void testClearLocalDirWhenNodeReboot() throws IOException {
+    nm = new MyNodeManager();
+    nm.start();
+    // create files under fileCache
+    createFiles(nmLocalDir.getAbsolutePath(), ContainerLocalizer.FILECACHE, 100);
+    localResourceDir.mkdirs();
+    ContainerManagerImpl containerManager = nm.getContainerManager();
+
+    ContainerLaunchContext containerLaunchContext =
+        Records.newRecord(ContainerLaunchContext.class);
+    // Construct the Container-id
+    ContainerId cId = createContainerId();
+    containerLaunchContext.setContainerId(cId);
+
+    containerLaunchContext.setUser(user);
+
+    URL localResourceUri =
+        ConverterUtils.getYarnUrlFromPath(localFS
+            .makeQualified(new Path(localResourceDir.getAbsolutePath())));
+
+    LocalResource localResource =
+        Records.newRecord(LocalResource.class);
+    localResource.setResource(localResourceUri);
+    localResource.setSize(-1);
+    localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+    localResource.setType(LocalResourceType.FILE);
+    localResource.setTimestamp(localResourceDir.lastModified());
+    String destinationFile = "dest_file";
+    Map<String, LocalResource> localResources =
+        new HashMap<String, LocalResource>();
+    localResources.put(destinationFile, localResource);
+    containerLaunchContext.setLocalResources(localResources);
+    containerLaunchContext.setUser(containerLaunchContext.getUser());
+    List<String> commands = new ArrayList<String>();
+    containerLaunchContext.setCommands(commands);
+    containerLaunchContext.setResource(Records
+        .newRecord(Resource.class));
+    containerLaunchContext.getResource().setMemory(1024);
+    StartContainerRequest startRequest =
+        Records.newRecord(StartContainerRequest.class);
+    startRequest.setContainerLaunchContext(containerLaunchContext);
+    containerManager.startContainer(startRequest);
+
+    GetContainerStatusRequest request =
+        Records.newRecord(GetContainerStatusRequest.class);
+    request.setContainerId(cId);
+    Container container =
+        nm.getNMContext().getContainers().get(request.getContainerId());
+
+    final int MAX_TRIES = 20;
+    int numTries = 0;
+    while (!container.getContainerState().equals(ContainerState.DONE)
+        && numTries <= MAX_TRIES) {
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException ex) {
+        // Do nothing
+      }
+      numTries++;
+    }
+
+    Assert.assertEquals(ContainerState.DONE, container.getContainerState());
+
+    Assert.assertTrue(
+        "The container should create a subDir named currentUser: " + user +
+            "under localDir/usercache",
+        numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+            ContainerLocalizer.USERCACHE) > 0);
+
+    Assert.assertTrue("There should be files or Dirs under nm_private when " +
+        "container is launched", numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+        ResourceLocalizationService.NM_PRIVATE_DIR) > 0);
+
+    nm.handle(new NodeManagerEvent(NodeManagerEventType.REBOOT));
+
+    numTries = 0;
+    while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer
+        .USERCACHE) > 0 || numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+        ContainerLocalizer.FILECACHE) > 0 || numOfLocalDirs(nmLocalDir
+        .getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR)
+        > 0) && numTries < MAX_TRIES) {
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException ex) {
+        // Do nothing
+      }
+      numTries++;
+    }
+
+    Assert.assertTrue("After NM reboots, all local files should be deleted",
+        numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer
+            .USERCACHE) == 0 && numOfLocalDirs(nmLocalDir.getAbsolutePath(),
+            ContainerLocalizer.FILECACHE) == 0 && numOfLocalDirs(nmLocalDir
+            .getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR)
+              == 0);
+    verify(delService, times(1)).delete(eq(user),
+        argThat(new PathInclude(user)));
+    verify(delService, times(1)).delete(
+        (String) isNull(),
+        argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR
+            + "_DEL_")));
+    verify(delService, times(1)).delete((String) isNull(),
+        argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_")));
+    verify(delService, times(1)).delete((String) isNull(),
+        argThat(new PathInclude(ContainerLocalizer.USERCACHE + "_DEL_")));
+
+  }
+
+  private int numOfLocalDirs(String localDir, String localSubDir) {
+    File[] listOfFiles = new File(localDir, localSubDir).listFiles();
+    if (listOfFiles == null) {
+      return 0;
+    } else {
+      return listOfFiles.length;
+    }
+  }
+
+  private void createFiles(String dir, String subDir, int numOfFiles) {
+    for (int i = 0; i < numOfFiles; i++) {
+      File newFile = new File(dir + "/" + subDir, "file_" + (i + 1));
+      try {
+        newFile.createNewFile();
+      } catch (IOException e) {
+        // Do nothing
+      }
+    }
+  }
+
+  private ContainerId createContainerId() {
+    ApplicationId appId = Records.newRecord(ApplicationId.class);
+    appId.setClusterTimestamp(0);
+    appId.setId(0);
+    ApplicationAttemptId appAttemptId =
+        Records.newRecord(ApplicationAttemptId.class);
+    appAttemptId.setApplicationId(appId);
+    appAttemptId.setAttemptId(1);
+    ContainerId containerId =
+        Records.newRecord(ContainerId.class);
+    containerId.setApplicationAttemptId(appAttemptId);
+    return containerId;
+  }
+
+  private class MyNodeManager extends NodeManager {
+
+    public MyNodeManager() {
+      super();
+      this.init(createNMConfig());
+    }
+
+    @Override
+    protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+        Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+      MockNodeStatusUpdater myNodeStatusUpdater = new MockNodeStatusUpdater(
+          context, dispatcher, healthChecker, metrics);
+      return myNodeStatusUpdater;
+    }
+
+    @Override
+    protected DeletionService createDeletionService(ContainerExecutor exec) {
+      delService = spy(new DeletionService(exec));
+      return delService;
+    }
+
+    // mimic part of reboot process
+    @Override
+    public void handle(NodeManagerEvent event) {
+      switch (event.getType()) {
+        case SHUTDOWN:
+          this.stop();
+          break;
+        case REBOOT:
+          this.stop();
+          this.createNewMyNodeManager().start();
+          break;
+        default:
+          LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");
+      }
+    }
+
+    private MyNodeManager createNewMyNodeManager() {
+      return new MyNodeManager();
+    }
+
+    private YarnConfiguration createNMConfig() {
+      YarnConfiguration conf = new YarnConfiguration();
+      conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB
+      conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345");
+      conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346");
+      conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
+      conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
+      return conf;
+    }
+  }
+
+  class PathInclude extends ArgumentMatcher<Path> {
+
+    final String part;
+
+    PathInclude(String part) {
+      this.part = part;
+    }
+
+    @Override
+    public boolean matches(Object o) {
+      return ((Path) o).getName().indexOf(part) != -1;
+    }
+  }
+}