Ver Fonte

Merge trunk into HA branch

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1245834 13f79535-47bb-0310-9956-ffa450edef68
Todd Lipcon há 13 anos atrás
pai
commit
ef5d7156db
19 ficheiros alterados com 330 adições e 196 exclusões
  1. 3 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 3 3
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestFSMainOperationsLocalFileSystem.java
  3. 2 2
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemDelegation.java
  4. 5 39
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsTrash.java
  5. 1 1
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java
  6. 13 23
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemTestSetup.java
  7. 5 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  8. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java
  9. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
  10. 11 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java
  11. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsFileStatusHdfs.java
  12. 56 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestExtendedBlock.java
  13. 36 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
  14. 5 0
      hadoop-mapreduce-project/CHANGES.txt
  15. 66 58
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java
  16. 62 52
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java
  17. 3 3
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java
  18. 16 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
  19. 38 8
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -201,6 +201,9 @@ Release 0.23.2 - UNRELEASED
 
     HADOOP-8083 javadoc generation for some modules is not done under target/ (tucu)
 
+    HADOOP-8036. TestViewFsTrash assumes the user's home directory is
+    2 levels deep. (Colin Patrick McCabe via eli)
+
 Release 0.23.1 - 2012-02-08 
 
   INCOMPATIBLE CHANGES

+ 3 - 3
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestFSMainOperationsLocalFileSystem.java

@@ -37,15 +37,15 @@ public class TestFSMainOperationsLocalFileSystem extends FSMainOperationsBaseTes
   public void setUp() throws Exception {
     Configuration conf = new Configuration();
     fcTarget = FileSystem.getLocal(conf);
-    fSys = ViewFileSystemTestSetup.setupForViewFs(
-        ViewFileSystemTestSetup.configWithViewfsScheme(), fcTarget);
+    fSys = ViewFileSystemTestSetup.setupForViewFileSystem(
+        ViewFileSystemTestSetup.createConfig(), fcTarget);
     super.setUp();
   }
   
   @After
   public void tearDown() throws Exception {
     super.tearDown();
-    ViewFileSystemTestSetup.tearDownForViewFs(fcTarget);
+    ViewFileSystemTestSetup.tearDown(fcTarget);
   }
   
   @Test

+ 2 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemDelegation.java

@@ -40,12 +40,12 @@ public class TestViewFileSystemDelegation { //extends ViewFileSystemTestSetup {
 
   @BeforeClass
   public static void setup() throws Exception {
-    conf = ViewFileSystemTestSetup.configWithViewfsScheme();    
+    conf = ViewFileSystemTestSetup.createConfig();
     fs1 = setupFileSystem(new URI("fs1:/"), FakeFileSystem.class);
     fs2 = setupFileSystem(new URI("fs2:/"), FakeFileSystem.class);
     viewFs = FileSystem.get(FsConstants.VIEWFS_URI, conf);
   }
-  
+
   static FakeFileSystem setupFileSystem(URI uri, Class clazz)
       throws Exception {
     String scheme = uri.getScheme();

+ 5 - 39
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsTrash.java

@@ -35,7 +35,6 @@ import org.mortbay.log.Log;
 public class TestViewFsTrash {
   FileSystem fsTarget;  // the target file system - the mount will point here
   FileSystem fsView;
-  Path targetTestRoot;
   Configuration conf;
 
   static class TestLFS extends LocalFileSystem {
@@ -55,52 +54,19 @@ public class TestViewFsTrash {
   @Before
   public void setUp() throws Exception {
     fsTarget = FileSystem.getLocal(new Configuration());
-    targetTestRoot = FileSystemTestHelper.getAbsoluteTestRootPath(fsTarget);
-    // In case previous test was killed before cleanup
-    fsTarget.delete(targetTestRoot, true);
-    // cleanup trash from previous run if it stuck around
-    fsTarget.delete(new Path(fsTarget.getHomeDirectory(), ".Trash/Current"),
-        true);
-    
-    fsTarget.mkdirs(targetTestRoot);
-    fsTarget.mkdirs(new Path(targetTestRoot,"dir1"));
-    
-    
-    // Now we use the mount fs to set links to user and dir
-    // in the test root
-    
-    // Set up the defaultMT in the config with our mount point links
-
-
-    conf = ViewFileSystemTestSetup.configWithViewfsScheme();
-    
-    // create a link for home directory so that trash path works
-    // set up viewfs's home dir root to point to home dir root on target
-    // But home dir is different on linux, mac etc.
-    // Figure it out by calling home dir on target
-
-    String homeDirRoot = fsTarget.getHomeDirectory()
-        .getParent().toUri().getPath();
-    ConfigUtil.addLink(conf, homeDirRoot,
-        fsTarget.makeQualified(new Path(homeDirRoot)).toUri());
-    ConfigUtil.setHomeDirConf(conf, homeDirRoot);
-    Log.info("Home dir base " + homeDirRoot);
-
-    fsView = ViewFileSystemTestSetup.setupForViewFs(conf, fsTarget);
-
-    // set working dir so that relative paths
-    //fsView.setWorkingDirectory(new Path(fsTarget.getWorkingDirectory().toUri().getPath()));
+    fsTarget.mkdirs(new Path(FileSystemTestHelper.
+        getTestRootPath(fsTarget), "dir1"));
+    conf = ViewFileSystemTestSetup.createConfig();
+    fsView = ViewFileSystemTestSetup.setupForViewFileSystem(conf, fsTarget);
     conf.set("fs.defaultFS", FsConstants.VIEWFS_URI.toString());
   }
  
-
   @After
   public void tearDown() throws Exception {
-    fsTarget.delete(targetTestRoot, true);
+    ViewFileSystemTestSetup.tearDown(fsTarget);
     fsTarget.delete(new Path(fsTarget.getHomeDirectory(), ".Trash/Current"),
         true);
   }
-
   
   @Test
   public void testTrash() throws IOException {

+ 1 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java

@@ -89,7 +89,7 @@ public class ViewFileSystemBaseTest {
     
     // Set up the defaultMT in the config with our mount point links
     //Configuration conf = new Configuration();
-    conf = ViewFileSystemTestSetup.configWithViewfsScheme();
+    conf = ViewFileSystemTestSetup.createConfig();
     setupMountPoints();
     fsView = FileSystem.get(FsConstants.VIEWFS_URI, conf);
   }

+ 13 - 23
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemTestSetup.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.FsConstants;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.viewfs.ConfigUtil;
+import org.mortbay.log.Log;
 
 
 /**
@@ -46,32 +47,21 @@ public class ViewFileSystemTestSetup {
    * @return return the ViewFS File context to be used for tests
    * @throws Exception
    */
-  static public FileSystem setupForViewFs(Configuration conf, FileSystem fsTarget) throws Exception {
+  static public FileSystem setupForViewFileSystem(Configuration conf, FileSystem fsTarget) throws Exception {
     /**
      * create the test root on local_fs - the  mount table will point here
      */
-    Path targetOfTests = FileSystemTestHelper.getTestRootPath(fsTarget);
-    // In case previous test was killed before cleanup
-    fsTarget.delete(targetOfTests, true);
-    
-    fsTarget.mkdirs(targetOfTests);
-  
+    fsTarget.mkdirs(FileSystemTestHelper.getTestRootPath(fsTarget));
+
+    // viewFs://home => fsTarget://home
+    String homeDirRoot = fsTarget.getHomeDirectory()
+        .getParent().toUri().getPath();
+    ConfigUtil.addLink(conf, homeDirRoot,
+        fsTarget.makeQualified(new Path(homeDirRoot)).toUri());
+    ConfigUtil.setHomeDirConf(conf, homeDirRoot);
+    Log.info("Home dir base " + homeDirRoot);
 
-    // Now set up a link from viewfs to targetfs for the first component of
-    // path of testdir. For example, if testdir is /user/<userid>/xx then
-    // a link from /user to targetfs://user.
-    
-    String testDir = FileSystemTestHelper.getTestRootPath(fsTarget).toUri().getPath();
-    int indexOf2ndSlash = testDir.indexOf('/', 1);
-    String testDirFirstComponent = testDir.substring(0, indexOf2ndSlash);
-    
-    
-    ConfigUtil.addLink(conf, testDirFirstComponent,
-        fsTarget.makeQualified(new Path(testDirFirstComponent)).toUri()); 
-    
     FileSystem fsView = FileSystem.get(FsConstants.VIEWFS_URI, conf);
-    //System.out.println("SRCOfTests = "+ getTestRootPath(fs, "test"));
-    //System.out.println("TargetOfTests = "+ targetOfTests.toUri());
     return fsView;
   }
 
@@ -79,12 +69,12 @@ public class ViewFileSystemTestSetup {
    * 
    * delete the test directory in the target  fs
    */
-  static public void tearDownForViewFs(FileSystem fsTarget) throws Exception {
+  static public void tearDown(FileSystem fsTarget) throws Exception {
     Path targetOfTests = FileSystemTestHelper.getTestRootPath(fsTarget);
     fsTarget.delete(targetOfTests, true);
   }
   
-  public static Configuration configWithViewfsScheme() {
+  public static Configuration createConfig() {
     Configuration conf = new Configuration();
     conf.set("fs.viewfs.impl", ViewFileSystem.class.getName());
     return conf; 

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -213,6 +213,9 @@ Trunk (unreleased changes)
     dfs.client.block.write.replace-datanode-on-failure.enable to be mistakenly
     disabled. (atm)
 
+    HDFS-2968. Protocol translator for BlockRecoveryCommand broken when
+    multiple blocks need recovery. (todd)
+
 Release 0.23.2 - UNRELEASED 
 
   INCOMPATIBLE CHANGES
@@ -257,6 +260,8 @@ Release 0.23.2 - UNRELEASED
     HDFS-2938. Recursive delete of a large directory make namenode
     unresponsive. (Hari Mankude via suresh)
 
+    HDFS-2969. ExtendedBlock.equals is incorrectly implemented (todd)
+
 Release 0.23.1 - 2012-02-08 
 
   INCOMPATIBLE CHANGES

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ExtendedBlock.java

@@ -145,7 +145,7 @@ public class ExtendedBlock implements Writable {
       return false;
     }
     ExtendedBlock b = (ExtendedBlock)o;
-    return b.block.equals(block) || b.poolId.equals(poolId);
+    return b.block.equals(block) && b.poolId.equals(poolId);
   }
   
   @Override // Object

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -770,8 +770,9 @@ public class PBHelper {
     List<RecoveringBlockProto> list = recoveryCmd.getBlocksList();
     List<RecoveringBlock> recoveringBlocks = new ArrayList<RecoveringBlock>(
         list.size());
-    for (int i = 0; i < list.size(); i++) {
-      recoveringBlocks.add(PBHelper.convert(list.get(0)));
+    
+    for (RecoveringBlockProto rbp : list) {
+      recoveringBlocks.add(PBHelper.convert(rbp));
     }
     return new BlockRecoveryCommand(recoveringBlocks);
   }

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java

@@ -32,6 +32,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
 
+import com.google.common.base.Joiner;
+
 /**
  * BlockRecoveryCommand is an instruction to a data-node to recover
  * the specified blocks.
@@ -138,6 +140,15 @@ public class BlockRecoveryCommand extends DatanodeCommand {
   public void add(RecoveringBlock block) {
     recoveringBlocks.add(block);
   }
+  
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("BlockRecoveryCommand(\n  ");
+    Joiner.on("\n  ").appendTo(sb, recoveringBlocks);
+    sb.append("\n)");
+    return sb.toString();
+  }
 
   ///////////////////////////////////////////
   // Writable

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsFileStatusHdfs.java

@@ -73,7 +73,7 @@ public class TestViewFsFileStatusHdfs {
 
    long len = FileSystemTestHelper.createFile(fHdfs, testfilename);
 
-    Configuration conf = ViewFileSystemTestSetup.configWithViewfsScheme();
+    Configuration conf = ViewFileSystemTestSetup.createConfig();
     ConfigUtil.addLink(conf, "/tmp", new URI(fHdfs.getUri().toString() + "/tmp"));
     FileSystem vfs = FileSystem.get(FsConstants.VIEWFS_URI, conf);
     assertEquals(ViewFileSystem.class, vfs.getClass());

+ 56 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestExtendedBlock.java

@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+
+public class TestExtendedBlock {
+  static final String POOL_A = "blockpool-a";
+  static final String POOL_B = "blockpool-b";
+  static final Block BLOCK_1_GS1 = new Block(1L, 100L, 1L);
+  static final Block BLOCK_1_GS2 = new Block(1L, 100L, 2L);
+  static final Block BLOCK_2_GS1 = new Block(2L, 100L, 1L);
+  
+  @Test
+  public void testEquals() {
+    // Same block -> equal
+    assertEquals(
+        new ExtendedBlock(POOL_A, BLOCK_1_GS1),
+        new ExtendedBlock(POOL_A, BLOCK_1_GS1));
+    // Different pools, same block id -> not equal
+    assertNotEquals(
+        new ExtendedBlock(POOL_A, BLOCK_1_GS1),
+        new ExtendedBlock(POOL_B, BLOCK_1_GS1));
+    // Same pool, different block id -> not equal
+    assertNotEquals(
+        new ExtendedBlock(POOL_A, BLOCK_1_GS1),
+        new ExtendedBlock(POOL_A, BLOCK_2_GS1));
+    // Same block, different genstamps -> equal
+    assertEquals(
+        new ExtendedBlock(POOL_A, BLOCK_1_GS1),
+        new ExtendedBlock(POOL_A, BLOCK_1_GS2));
+  }
+
+  private static void assertNotEquals(Object a, Object b) {
+    assertFalse("expected not equal: '" + a + "' and '" + b + "'",
+        a.equals(b));
+  }
+}

+ 36 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
@@ -58,6 +59,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -68,6 +70,10 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
 import org.junit.Test;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
 /**
  * Tests for {@link PBHelper}
  */
@@ -265,9 +271,12 @@ public class TestPBHelper {
       compare(logs.get(i), logs1.get(i));
     }
   }
-  
   public ExtendedBlock getExtendedBlock() {
-    return new ExtendedBlock("bpid", 1, 100, 2);
+    return getExtendedBlock(1);
+  }
+  
+  public ExtendedBlock getExtendedBlock(long blkid) {
+    return new ExtendedBlock("bpid", blkid, 100, 2);
   }
   
   public DatanodeInfo getDNInfo() {
@@ -318,6 +327,31 @@ public class TestPBHelper {
     }
   }
   
+  @Test
+  public void testConvertBlockRecoveryCommand() {
+    DatanodeInfo[] dnInfo = new DatanodeInfo[] { getDNInfo(), getDNInfo() };
+
+    List<RecoveringBlock> blks = ImmutableList.of(
+      new RecoveringBlock(getExtendedBlock(1), dnInfo, 3),
+      new RecoveringBlock(getExtendedBlock(2), dnInfo, 3)
+    );
+    
+    BlockRecoveryCommand cmd = new BlockRecoveryCommand(blks);
+    BlockRecoveryCommandProto proto = PBHelper.convert(cmd);
+    assertEquals(1, proto.getBlocks(0).getBlock().getB().getBlockId());
+    assertEquals(2, proto.getBlocks(1).getBlock().getB().getBlockId());
+    
+    BlockRecoveryCommand cmd2 = PBHelper.convert(proto);
+    
+    List<RecoveringBlock> cmd2Blks = Lists.newArrayList(
+        cmd2.getRecoveringBlocks());
+    assertEquals(blks.get(0).getBlock(), cmd2Blks.get(0).getBlock());
+    assertEquals(blks.get(1).getBlock(), cmd2Blks.get(1).getBlock());
+    assertEquals(Joiner.on(",").join(blks), Joiner.on(",").join(cmd2Blks));
+    assertEquals(cmd.toString(), cmd2.toString());
+  }
+  
+  
   @Test
   public void testConvertText() {
     Text t = new Text("abc".getBytes());

+ 5 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -105,6 +105,8 @@ Release 0.23.2 - UNRELEASED
   OPTIMIZATIONS
 
   BUG FIXES
+    MAPREDUCE-3862.  Nodemanager can appear to hang on shutdown due to lingering
+    DeletionService threads (Jason Lowe via bobby)
 
     MAPREDUCE-3680. FifoScheduler web service rest API can print out invalid 
     JSON. (B Anil Kumar via tgraves)
@@ -121,6 +123,9 @@ Release 0.23.2 - UNRELEASED
     MAPREDUCE-3856. Instances of RunningJob class givs incorrect job tracking
     urls when mutiple jobs are submitted from same client jvm. (Eric Payne via
     sseth)
+
+    MAPREDUCE-3583. Change pid to String and stime to BigInteger in order to
+    avoid NumberFormatException caused by overflow.  (Zhihong Yu via szetszwo)
  
 Release 0.23.1 - 2012-02-08 
 

+ 66 - 58
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java

@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
+import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -91,12 +92,14 @@ public class ProcfsBasedProcessTree extends ProcessTree {
   // to a test directory.
   private String procfsDir;
   
-  private Integer pid = -1;
+  static private String deadPid = "-1";
+  private String pid = deadPid;
+  static private Pattern numberPattern = Pattern.compile("[1-9][0-9]*");
   private Long cpuTime = 0L;
   private boolean setsidUsed = false;
   private long sleeptimeBeforeSigkill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
 
-  private Map<Integer, ProcessInfo> processTree = new HashMap<Integer, ProcessInfo>();
+  private Map<String, ProcessInfo> processTree = new HashMap<String, ProcessInfo>();
 
   public ProcfsBasedProcessTree(String pid) {
     this(pid, false, DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
@@ -166,19 +169,19 @@ public class ProcfsBasedProcessTree extends ProcessTree {
    * @return the process-tree with latest state.
    */
   public ProcfsBasedProcessTree getProcessTree() {
-    if (pid != -1) {
+    if (!pid.equals(deadPid)) {
       // Get the list of processes
-      List<Integer> processList = getProcessList();
+      List<String> processList = getProcessList();
 
-      Map<Integer, ProcessInfo> allProcessInfo = new HashMap<Integer, ProcessInfo>();
+      Map<String, ProcessInfo> allProcessInfo = new HashMap<String, ProcessInfo>();
       
       // cache the processTree to get the age for processes
-      Map<Integer, ProcessInfo> oldProcs = 
-              new HashMap<Integer, ProcessInfo>(processTree);
+      Map<String, ProcessInfo> oldProcs = 
+              new HashMap<String, ProcessInfo>(processTree);
       processTree.clear();
 
       ProcessInfo me = null;
-      for (Integer proc : processList) {
+      for (String proc : processList) {
         // Get information for each process
         ProcessInfo pInfo = new ProcessInfo(proc);
         if (constructProcessInfo(pInfo, procfsDir) != null) {
@@ -195,9 +198,9 @@ public class ProcfsBasedProcessTree extends ProcessTree {
       }
 
       // Add each process to its parent.
-      for (Map.Entry<Integer, ProcessInfo> entry : allProcessInfo.entrySet()) {
-        Integer pID = entry.getKey();
-        if (pID != 1) {
+      for (Map.Entry<String, ProcessInfo> entry : allProcessInfo.entrySet()) {
+        String pID = entry.getKey();
+        if (!pID.equals("1")) {
           ProcessInfo pInfo = entry.getValue();
           ProcessInfo parentPInfo = allProcessInfo.get(pInfo.getPpid());
           if (parentPInfo != null) {
@@ -218,7 +221,7 @@ public class ProcfsBasedProcessTree extends ProcessTree {
       }
 
       // update age values and compute the number of jiffies since last update
-      for (Map.Entry<Integer, ProcessInfo> procs : processTree.entrySet()) {
+      for (Map.Entry<String, ProcessInfo> procs : processTree.entrySet()) {
         ProcessInfo oldInfo = oldProcs.get(procs.getKey());
         if (procs.getValue() != null) {
           procs.getValue().updateJiffy(oldInfo);
@@ -242,10 +245,10 @@ public class ProcfsBasedProcessTree extends ProcessTree {
    * @return true if the root-process is alive, false otherwise.
    */
   public boolean isAlive() {
-    if (pid == -1) {
+    if (pid.equals(deadPid)) {
       return false;
     } else {
-      return isAlive(pid.toString());
+      return isAlive(pid);
     }
   }
 
@@ -256,8 +259,8 @@ public class ProcfsBasedProcessTree extends ProcessTree {
    *           alive, false otherwise.
    */
   public boolean isAnyProcessInTreeAlive() {
-    for (Integer pId : processTree.keySet()) {
-      if (isAlive(pId.toString())) {
+    for (String pId : processTree.keySet()) {
+      if (isAlive(pId)) {
         return true;
       }
     }
@@ -269,9 +272,8 @@ public class ProcfsBasedProcessTree extends ProcessTree {
    * @param procfsDir  Procfs root dir
    */
   static boolean checkPidPgrpidForMatch(String pidStr, String procfsDir) {
-    Integer pId = Integer.parseInt(pidStr);
     // Get information for this process
-    ProcessInfo pInfo = new ProcessInfo(pId);
+    ProcessInfo pInfo = new ProcessInfo(pidStr);
     pInfo = constructProcessInfo(pInfo, procfsDir);
     if (pInfo == null) {
       // process group leader may have finished execution, but we still need to
@@ -279,14 +281,15 @@ public class ProcfsBasedProcessTree extends ProcessTree {
       return true;
     }
 
+    String pgrpId = pInfo.getPgrpId().toString();
     //make sure that pId and its pgrpId match
-    if (!pInfo.getPgrpId().equals(pId)) {
-      LOG.warn("Unexpected: Process with PID " + pId +
-               " is not a process group leader.");
+    if (!pgrpId.equals(pidStr)) {
+      LOG.warn("Unexpected: Process with PID " + pidStr +
+               " is not a process group leader. pgrpId is: " + pInfo.getPgrpId());
       return false;
     }
     if (LOG.isDebugEnabled()) {
-      LOG.debug(pId + " is a process group leader, as expected.");
+      LOG.debug(pidStr + " is a process group leader, as expected.");
     }
     return true;
   }
@@ -324,7 +327,7 @@ public class ProcfsBasedProcessTree extends ProcessTree {
    */
   public void destroy(boolean inBackground) {
     LOG.debug("Killing ProcfsBasedProcessTree of " + pid);
-    if (pid == -1) {
+    if (pid.equals(deadPid)) {
       return;
     }
     if (isAlive(pid.toString())) {
@@ -347,7 +350,7 @@ public class ProcfsBasedProcessTree extends ProcessTree {
   }
 
   private static final String PROCESSTREE_DUMP_FORMAT =
-      "\t|- %d %d %d %d %s %d %d %d %d %s\n";
+      "\t|- %s %s %d %d %s %d %d %d %d %s\n";
 
   /**
    * Get a dump of the process-tree.
@@ -458,34 +461,27 @@ public class ProcfsBasedProcessTree extends ProcessTree {
     return cpuTime;
   }
 
-  private static Integer getValidPID(String pid) {
-    Integer retPid = -1;
-    try {
-      retPid = Integer.parseInt(pid);
-      if (retPid <= 0) {
-        retPid = -1;
-      }
-    } catch (NumberFormatException nfe) {
-      retPid = -1;
-    }
-    return retPid;
+  private static String getValidPID(String pid) {
+    if (pid == null) return deadPid;
+    Matcher m = numberPattern.matcher(pid);
+    if (m.matches()) return pid;
+    return deadPid;
   }
 
   /**
    * Get the list of all processes in the system.
    */
-  private List<Integer> getProcessList() {
+  private List<String> getProcessList() {
     String[] processDirs = (new File(procfsDir)).list();
-    List<Integer> processList = new ArrayList<Integer>();
+    List<String> processList = new ArrayList<String>();
 
     for (String dir : processDirs) {
+      Matcher m = numberPattern.matcher(dir);
+      if (!m.matches()) continue;
       try {
-        int pd = Integer.parseInt(dir);
         if ((new File(procfsDir, dir)).isDirectory()) {
-          processList.add(Integer.valueOf(pd));
+          processList.add(dir);
         }
-      } catch (NumberFormatException n) {
-        // skip this directory
       } catch (SecurityException s) {
         // skip this process
       }
@@ -511,7 +507,7 @@ public class ProcfsBasedProcessTree extends ProcessTree {
     BufferedReader in = null;
     FileReader fReader = null;
     try {
-      File pidDir = new File(procfsDir, String.valueOf(pinfo.getPid()));
+      File pidDir = new File(procfsDir, pinfo.getPid());
       fReader = new FileReader(new File(pidDir, PROCFS_STAT_FILE));
       in = new BufferedReader(fReader);
     } catch (FileNotFoundException f) {
@@ -528,9 +524,9 @@ public class ProcfsBasedProcessTree extends ProcessTree {
       boolean mat = m.find();
       if (mat) {
         // Set (name) (ppid) (pgrpId) (session) (utime) (stime) (vsize) (rss)
-        pinfo.updateProcessInfo(m.group(2), Integer.parseInt(m.group(3)),
+        pinfo.updateProcessInfo(m.group(2), m.group(3),
                 Integer.parseInt(m.group(4)), Integer.parseInt(m.group(5)),
-                Long.parseLong(m.group(7)), Long.parseLong(m.group(8)),
+                Long.parseLong(m.group(7)), new BigInteger(m.group(8)),
                 Long.parseLong(m.group(10)), Long.parseLong(m.group(11)));
       } else {
         LOG.warn("Unexpected: procfs stat file is not in the expected format"
@@ -562,7 +558,7 @@ public class ProcfsBasedProcessTree extends ProcessTree {
    */
   public String toString() {
     StringBuffer pTree = new StringBuffer("[ ");
-    for (Integer p : processTree.keySet()) {
+    for (String p : processTree.keySet()) {
       pTree.append(p);
       pTree.append(" ");
     }
@@ -575,15 +571,16 @@ public class ProcfsBasedProcessTree extends ProcessTree {
    * 
    */
   private static class ProcessInfo {
-    private Integer pid; // process-id
+    private String pid; // process-id
     private String name; // command name
     private Integer pgrpId; // process group-id
-    private Integer ppid; // parent process-id
+    private String ppid; // parent process-id
     private Integer sessionId; // session-id
     private Long vmem; // virtual memory usage
     private Long rssmemPage; // rss memory usage in # of pages
     private Long utime = 0L; // # of jiffies in user mode
-    private Long stime = 0L; // # of jiffies in kernel mode
+    private final BigInteger MAX_LONG = BigInteger.valueOf(Long.MAX_VALUE);
+    private BigInteger stime = new BigInteger("0"); // # of jiffies in kernel mode
     // how many times has this process been seen alive
     private int age; 
 
@@ -595,13 +592,13 @@ public class ProcfsBasedProcessTree extends ProcessTree {
 
     private List<ProcessInfo> children = new ArrayList<ProcessInfo>(); // list of children
 
-    public ProcessInfo(int pid) {
-      this.pid = Integer.valueOf(pid);
+    public ProcessInfo(String pid) {
+      this.pid = pid;
       // seeing this the first time.
       this.age = 1;
     }
 
-    public Integer getPid() {
+    public String getPid() {
       return pid;
     }
 
@@ -613,7 +610,7 @@ public class ProcfsBasedProcessTree extends ProcessTree {
       return pgrpId;
     }
 
-    public Integer getPpid() {
+    public String getPpid() {
       return ppid;
     }
 
@@ -629,7 +626,7 @@ public class ProcfsBasedProcessTree extends ProcessTree {
       return utime;
     }
 
-    public Long getStime() {
+    public BigInteger getStime() {
       return stime;
     }
 
@@ -652,8 +649,8 @@ public class ProcfsBasedProcessTree extends ProcessTree {
       return false;
     }
 
-    public void updateProcessInfo(String name, Integer ppid, Integer pgrpId,
-        Integer sessionId, Long utime, Long stime, Long vmem, Long rssmem) {
+    public void updateProcessInfo(String name, String ppid, Integer pgrpId,
+        Integer sessionId, Long utime, BigInteger stime, Long vmem, Long rssmem) {
       this.name = name;
       this.ppid = ppid;
       this.pgrpId = pgrpId;
@@ -665,8 +662,19 @@ public class ProcfsBasedProcessTree extends ProcessTree {
     }
 
     public void updateJiffy(ProcessInfo oldInfo) {
-      this.dtime = (oldInfo == null ? this.utime + this.stime
-              : (this.utime + this.stime) - (oldInfo.utime + oldInfo.stime));
+      if (oldInfo == null) {
+        BigInteger sum = this.stime.add(BigInteger.valueOf(this.utime));
+        if (sum.compareTo(MAX_LONG) > 0) {
+          this.dtime = 0L;
+          LOG.warn("Sum of stime (" + this.stime + ") and utime (" + this.utime
+              + ") is greater than " + Long.MAX_VALUE);
+        } else {
+          this.dtime = sum.longValue();
+        }
+        return;
+      }
+      this.dtime = (this.utime - oldInfo.utime +
+          this.stime.subtract(oldInfo.stime).longValue());
     }
 
     public void updateAge(ProcessInfo oldInfo) {
@@ -690,7 +698,7 @@ public class ProcfsBasedProcessTree extends ProcessTree {
       FileReader fReader = null;
       try {
         fReader =
-            new FileReader(new File(new File(procfsDir, pid.toString()),
+            new FileReader(new File(new File(procfsDir, pid),
                 PROCFS_CMDLINE_FILE));
       } catch (FileNotFoundException f) {
         // The process vanished in the interim!

+ 62 - 52
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java

@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
+import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -91,12 +92,14 @@ public class ProcfsBasedProcessTree {
   // to a test directory.
   private String procfsDir;
   
-  protected final Integer pid;
+  static private String deadPid = "-1";
+  private String pid = deadPid;
+  static private Pattern numberPattern = Pattern.compile("[1-9][0-9]*");
   private Long cpuTime = 0L;
   private boolean setsidUsed = false;
 
-  protected Map<Integer, ProcessInfo> processTree =
-    new HashMap<Integer, ProcessInfo>();
+  protected Map<String, ProcessInfo> processTree =
+    new HashMap<String, ProcessInfo>();
 
   public ProcfsBasedProcessTree(String pid) {
     this(pid, false);
@@ -150,19 +153,19 @@ public class ProcfsBasedProcessTree {
    * @return the process-tree with latest state.
    */
   public ProcfsBasedProcessTree getProcessTree() {
-    if (pid != -1) {
+    if (!pid.equals(deadPid)) {
       // Get the list of processes
-      List<Integer> processList = getProcessList();
+      List<String> processList = getProcessList();
 
-      Map<Integer, ProcessInfo> allProcessInfo = new HashMap<Integer, ProcessInfo>();
+      Map<String, ProcessInfo> allProcessInfo = new HashMap<String, ProcessInfo>();
       
       // cache the processTree to get the age for processes
-      Map<Integer, ProcessInfo> oldProcs = 
-              new HashMap<Integer, ProcessInfo>(processTree);
+      Map<String, ProcessInfo> oldProcs = 
+              new HashMap<String, ProcessInfo>(processTree);
       processTree.clear();
 
       ProcessInfo me = null;
-      for (Integer proc : processList) {
+      for (String proc : processList) {
         // Get information for each process
         ProcessInfo pInfo = new ProcessInfo(proc);
         if (constructProcessInfo(pInfo, procfsDir) != null) {
@@ -179,9 +182,9 @@ public class ProcfsBasedProcessTree {
       }
 
       // Add each process to its parent.
-      for (Map.Entry<Integer, ProcessInfo> entry : allProcessInfo.entrySet()) {
-        Integer pID = entry.getKey();
-        if (pID != 1) {
+      for (Map.Entry<String, ProcessInfo> entry : allProcessInfo.entrySet()) {
+        String pID = entry.getKey();
+        if (!pID.equals("1")) {
           ProcessInfo pInfo = entry.getValue();
           ProcessInfo parentPInfo = allProcessInfo.get(pInfo.getPpid());
           if (parentPInfo != null) {
@@ -202,7 +205,7 @@ public class ProcfsBasedProcessTree {
       }
 
       // update age values and compute the number of jiffies since last update
-      for (Map.Entry<Integer, ProcessInfo> procs : processTree.entrySet()) {
+      for (Map.Entry<String, ProcessInfo> procs : processTree.entrySet()) {
         ProcessInfo oldInfo = oldProcs.get(procs.getKey());
         if (procs.getValue() != null) {
           procs.getValue().updateJiffy(oldInfo);
@@ -227,20 +230,22 @@ public class ProcfsBasedProcessTree {
     return checkPidPgrpidForMatch(pid, PROCFS);
   }
 
-  public static boolean checkPidPgrpidForMatch(int _pid, String procfs) {
+  public static boolean checkPidPgrpidForMatch(String _pid, String procfs) {
     // Get information for this process
     ProcessInfo pInfo = new ProcessInfo(_pid);
     pInfo = constructProcessInfo(pInfo, procfs);
     // null if process group leader finished execution; issue no warning
     // make sure that pid and its pgrpId match
-    return pInfo == null || pInfo.getPgrpId().equals(_pid);
+    if (pInfo == null) return true;
+    String pgrpId = pInfo.getPgrpId().toString();
+    return pgrpId.equals(_pid);
   }
 
   private static final String PROCESSTREE_DUMP_FORMAT =
-      "\t|- %d %d %d %d %s %d %d %d %d %s\n";
+      "\t|- %s %s %d %d %s %d %d %d %d %s\n";
 
-  public List<Integer> getCurrentProcessIDs() {
-    List<Integer> currentPIDs = new ArrayList<Integer>();
+  public List<String> getCurrentProcessIDs() {
+    List<String> currentPIDs = new ArrayList<String>();
     currentPIDs.addAll(processTree.keySet());
     return currentPIDs;
   }
@@ -354,34 +359,27 @@ public class ProcfsBasedProcessTree {
     return cpuTime;
   }
 
-  private static Integer getValidPID(String pid) {
-    Integer retPid = -1;
-    try {
-      retPid = Integer.parseInt(pid);
-      if (retPid <= 0) {
-        retPid = -1;
-      }
-    } catch (NumberFormatException nfe) {
-      retPid = -1;
-    }
-    return retPid;
+  private static String getValidPID(String pid) {
+    if (pid == null) return deadPid;
+    Matcher m = numberPattern.matcher(pid);
+    if (m.matches()) return pid;
+    return deadPid;
   }
 
   /**
    * Get the list of all processes in the system.
    */
-  private List<Integer> getProcessList() {
+  private List<String> getProcessList() {
     String[] processDirs = (new File(procfsDir)).list();
-    List<Integer> processList = new ArrayList<Integer>();
+    List<String> processList = new ArrayList<String>();
 
     for (String dir : processDirs) {
+      Matcher m = numberPattern.matcher(dir);
+      if (!m.matches()) continue;
       try {
-        int pd = Integer.parseInt(dir);
         if ((new File(procfsDir, dir)).isDirectory()) {
-          processList.add(Integer.valueOf(pd));
+          processList.add(dir);
         }
-      } catch (NumberFormatException n) {
-        // skip this directory
       } catch (SecurityException s) {
         // skip this process
       }
@@ -407,7 +405,7 @@ public class ProcfsBasedProcessTree {
     BufferedReader in = null;
     FileReader fReader = null;
     try {
-      File pidDir = new File(procfsDir, String.valueOf(pinfo.getPid()));
+      File pidDir = new File(procfsDir, pinfo.getPid());
       fReader = new FileReader(new File(pidDir, PROCFS_STAT_FILE));
       in = new BufferedReader(fReader);
     } catch (FileNotFoundException f) {
@@ -424,9 +422,9 @@ public class ProcfsBasedProcessTree {
       boolean mat = m.find();
       if (mat) {
         // Set (name) (ppid) (pgrpId) (session) (utime) (stime) (vsize) (rss)
-        pinfo.updateProcessInfo(m.group(2), Integer.parseInt(m.group(3)),
+        pinfo.updateProcessInfo(m.group(2), m.group(3),
                 Integer.parseInt(m.group(4)), Integer.parseInt(m.group(5)),
-                Long.parseLong(m.group(7)), Long.parseLong(m.group(8)),
+                Long.parseLong(m.group(7)), new BigInteger(m.group(8)),
                 Long.parseLong(m.group(10)), Long.parseLong(m.group(11)));
       } else {
         LOG.warn("Unexpected: procfs stat file is not in the expected format"
@@ -458,7 +456,7 @@ public class ProcfsBasedProcessTree {
    */
   public String toString() {
     StringBuffer pTree = new StringBuffer("[ ");
-    for (Integer p : processTree.keySet()) {
+    for (String p : processTree.keySet()) {
       pTree.append(p);
       pTree.append(" ");
     }
@@ -471,15 +469,16 @@ public class ProcfsBasedProcessTree {
    * 
    */
   private static class ProcessInfo {
-    private Integer pid; // process-id
+    private String pid; // process-id
     private String name; // command name
     private Integer pgrpId; // process group-id
-    private Integer ppid; // parent process-id
+    private String ppid; // parent process-id
     private Integer sessionId; // session-id
     private Long vmem; // virtual memory usage
     private Long rssmemPage; // rss memory usage in # of pages
     private Long utime = 0L; // # of jiffies in user mode
-    private Long stime = 0L; // # of jiffies in kernel mode
+    private final BigInteger MAX_LONG = BigInteger.valueOf(Long.MAX_VALUE);
+    private BigInteger stime = new BigInteger("0"); // # of jiffies in kernel mode
     // how many times has this process been seen alive
     private int age; 
 
@@ -491,13 +490,13 @@ public class ProcfsBasedProcessTree {
 
     private List<ProcessInfo> children = new ArrayList<ProcessInfo>(); // list of children
 
-    public ProcessInfo(int pid) {
-      this.pid = Integer.valueOf(pid);
+    public ProcessInfo(String pid) {
+      this.pid = pid;
       // seeing this the first time.
       this.age = 1;
     }
 
-    public Integer getPid() {
+    public String getPid() {
       return pid;
     }
 
@@ -509,7 +508,7 @@ public class ProcfsBasedProcessTree {
       return pgrpId;
     }
 
-    public Integer getPpid() {
+    public String getPpid() {
       return ppid;
     }
 
@@ -525,7 +524,7 @@ public class ProcfsBasedProcessTree {
       return utime;
     }
 
-    public Long getStime() {
+    public BigInteger getStime() {
       return stime;
     }
 
@@ -548,8 +547,8 @@ public class ProcfsBasedProcessTree {
       return false;
     }
 
-    public void updateProcessInfo(String name, Integer ppid, Integer pgrpId,
-        Integer sessionId, Long utime, Long stime, Long vmem, Long rssmem) {
+    public void updateProcessInfo(String name, String ppid, Integer pgrpId,
+        Integer sessionId, Long utime, BigInteger stime, Long vmem, Long rssmem) {
       this.name = name;
       this.ppid = ppid;
       this.pgrpId = pgrpId;
@@ -559,10 +558,21 @@ public class ProcfsBasedProcessTree {
       this.vmem = vmem;
       this.rssmemPage = rssmem;
     }
-
+    
     public void updateJiffy(ProcessInfo oldInfo) {
-      this.dtime = (oldInfo == null ? this.utime + this.stime
-              : (this.utime + this.stime) - (oldInfo.utime + oldInfo.stime));
+      if (oldInfo == null) {
+        BigInteger sum = this.stime.add(BigInteger.valueOf(this.utime));
+        if (sum.compareTo(MAX_LONG) > 0) {
+          this.dtime = 0L;
+          LOG.warn("Sum of stime (" + this.stime + ") and utime (" + this.utime
+              + ") is greater than " + Long.MAX_VALUE);
+        } else {
+          this.dtime = sum.longValue();
+        }
+        return;
+      }
+      this.dtime = (this.utime - oldInfo.utime +
+          this.stime.subtract(oldInfo.stime).longValue());
     }
 
     public void updateAge(ProcessInfo oldInfo) {

+ 3 - 3
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestProcfsBasedProcessTree.java

@@ -527,7 +527,7 @@ public class TestProcfsBasedProcessTree {
 
       // Let us not create stat file for pid 100.
       Assert.assertTrue(ProcfsBasedProcessTree.checkPidPgrpidForMatch(
-            Integer.valueOf(pid), procfsRootDir.getAbsolutePath()));
+            pid, procfsRootDir.getAbsolutePath()));
     } finally {
       FileUtil.fullyDelete(procfsRootDir);
     }
@@ -662,8 +662,8 @@ public class TestProcfsBasedProcessTree {
    */
   private static boolean isAnyProcessInTreeAlive(
       ProcfsBasedProcessTree processTree) {
-    for (Integer pId : processTree.getCurrentProcessIDs()) {
-      if (isAlive(pId.toString())) {
+    for (String pId : processTree.getCurrentProcessIDs()) {
+      if (isAlive(pId)) {
         return true;
       }
     }

+ 16 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java

@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 import static java.util.concurrent.TimeUnit.*;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
@@ -85,6 +86,7 @@ public class DeletionService extends AbstractService {
       sched = new ScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT,
           tf);
     }
+    sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
     sched.setKeepAliveTime(60L, SECONDS);
     super.init(conf);
   }
@@ -92,14 +94,27 @@ public class DeletionService extends AbstractService {
   @Override
   public void stop() {
     sched.shutdown();
+    boolean terminated = false;
     try {
-      sched.awaitTermination(10, SECONDS);
+      terminated = sched.awaitTermination(10, SECONDS);
     } catch (InterruptedException e) {
+    }
+    if (terminated != true) {
       sched.shutdownNow();
     }
     super.stop();
   }
 
+  /**
+   * Determine if the service has completely stopped.
+   * Used only by unit tests
+   * @return true if service has completely stopped
+   */
+  @Private
+  public boolean isTerminated() {
+    return getServiceState() == STATE.STOPPED && sched.isTerminated();
+  }
+
   private class FileDeletion implements Runnable {
     final String user;
     final Path subDir;

+ 38 - 8
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java

@@ -27,12 +27,15 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 
 
 import org.junit.AfterClass;
 import org.junit.Test;
+import org.mockito.Mockito;
+
 import static org.junit.Assert.*;
 
 public class TestDeletionService {
@@ -107,12 +110,18 @@ public class TestDeletionService {
         del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
             p, null);
       }
+
+      int msecToWait = 20 * 1000;
+      for (Path p : dirs) {
+        while (msecToWait > 0 && lfs.util().exists(p)) {
+          Thread.sleep(100);
+          msecToWait -= 100;
+        }
+        assertFalse(lfs.util().exists(p));
+      }
     } finally {
       del.stop();
     }
-    for (Path p : dirs) {
-      assertFalse(lfs.util().exists(p));
-    }
   }
 
   @Test
@@ -137,14 +146,35 @@ public class TestDeletionService {
         del.delete((Long.parseLong(p.getName()) % 2) == 0 ? null : "dingo",
             p, baseDirs.toArray(new Path[4]));
       }
+
+      int msecToWait = 20 * 1000;
+      for (Path p : baseDirs) {
+        for (Path q : content) {
+          Path fp = new Path(p, q);
+          while (msecToWait > 0 && lfs.util().exists(fp)) {
+            Thread.sleep(100);
+            msecToWait -= 100;
+          }
+          assertFalse(lfs.util().exists(fp));
+        }
+      }
     } finally {
       del.stop();
     }
-    for (Path p : baseDirs) {
-      for (Path q : content) {
-        assertFalse(lfs.util().exists(new Path(p, q)));
-      }
-    }
   }
 
+  @Test
+  public void testStopWithDelayedTasks() throws Exception {
+    DeletionService del = new DeletionService(Mockito.mock(ContainerExecutor.class));
+    Configuration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 60);
+    del.init(conf);
+    del.start();
+    try {
+      del.delete("dingo", new Path("/does/not/exist"));
+    } finally {
+      del.stop();
+    }
+    assertTrue(del.isTerminated());
+  }
 }