Browse Source

Merge r1523109 through r1523401 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1523402 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 12 năm trước cách đây
mục cha
commit
959635f0e7
46 tập tin đã thay đổi với 795 bổ sung597 xóa
  1. 0 4
      hadoop-client/pom.xml
  2. 11 3
      hadoop-common-project/hadoop-common/CHANGES.txt
  3. 5 3
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java
  4. 10 1
      hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java
  5. 8 8
      hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
  6. 12 1
      hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
  7. 4 4
      hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
  8. 3 3
      hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java
  9. 6 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  10. 3 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  11. 9 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  12. 10 55
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
  13. 120 184
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
  14. 15 41
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
  15. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
  16. 7 5
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
  17. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRBWBlockInvalidation.java
  18. 76 100
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
  19. 73 79
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java
  20. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
  21. 12 10
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
  22. 9 6
      hadoop-project/pom.xml
  23. 9 0
      hadoop-yarn-project/CHANGES.txt
  24. 35 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationMasterResponse.java
  25. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
  26. 16 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
  27. 22 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationMasterResponsePBImpl.java
  28. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
  29. 27 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
  30. 7 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
  31. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
  32. 0 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
  33. 0 23
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
  34. 7 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
  35. 17 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
  36. 17 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
  37. 6 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
  38. 92 17
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
  39. 36 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRemovedEvent.java
  40. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
  41. 6 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
  42. 15 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java
  43. 9 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
  44. 11 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
  45. 11 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
  46. 48 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java

+ 0 - 4
hadoop-client/pom.xml

@@ -39,10 +39,6 @@
       <artifactId>hadoop-common</artifactId>
       <scope>compile</scope>
       <exclusions>
-        <exclusion>
-          <groupId>commons-httpclient</groupId>
-          <artifactId>commons-httpclient</artifactId>
-        </exclusion>
         <exclusion>
           <groupId>tomcat</groupId>
           <artifactId>jasper-compiler</artifactId>

+ 11 - 3
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -363,9 +363,6 @@ Release 2.3.0 - UNRELEASED
 
     HADOOP-9908. Fix NPE when versioninfo properties file is missing (todd)
 
-    HADOOP-9350. Hadoop not building against Java7 on OSX
-    (Robert Kanter via stevel)
-
 Release 2.1.1-beta - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -408,6 +405,9 @@ Release 2.1.1-beta - UNRELEASED
     HADOOP-9918. Add addIfService to CompositeService (Karthik Kambatla via
     Sandy Ryza)
 
+    HADOOP-9945. HAServiceState should have a state for stopped services.
+    (Karthik Kambatla via atm)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -460,6 +460,14 @@ Release 2.1.1-beta - UNRELEASED
     HADOOP-9958. Add old constructor back to DelegationTokenInformation to
     unbreak downstream builds. (Andrew Wang)
 
+    HADOOP-9960. Upgrade Jersey version to 1.9. (Karthik Kambatla via atm)
+
+    HADOOP-9557. hadoop-client excludes commons-httpclient. (Lohit Vijayarenu via
+    cnauroth)
+
+    HADOOP-9350. Hadoop not building against Java7 on OSX
+    (Robert Kanter via stevel)
+
 Release 2.1.0-beta - 2013-08-22
 
   INCOMPATIBLE CHANGES

+ 5 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceProtocol.java

@@ -43,13 +43,15 @@ public interface HAServiceProtocol {
   public static final long versionID = 1L;
 
   /**
-   * An HA service may be in active or standby state. During
-   * startup, it is in an unknown INITIALIZING state.
+   * An HA service may be in active or standby state. During startup, it is in
+   * an unknown INITIALIZING state. During shutdown, it is in the STOPPING state
+   * and can no longer return to active/standby states.
    */
   public enum HAServiceState {
     INITIALIZING("initializing"),
     ACTIVE("active"),
-    STANDBY("standby");
+    STANDBY("standby"),
+    STOPPING("stopping");
 
     private String name;
 

+ 10 - 1
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java

@@ -39,6 +39,12 @@ import org.jboss.netty.channel.Channel;
 public class Nfs3Utils {
   public final static String INODEID_PATH_PREFIX = "/.reserved/.inodes/";
 
+  
+  public final static String READ_RPC_START =  "READ_RPC_CALL_START____";
+  public final static String READ_RPC_END =    "READ_RPC_CALL_END______";
+  public final static String WRITE_RPC_START = "WRITE_RPC_CALL_START____";
+  public final static String WRITE_RPC_END =   "WRITE_RPC_CALL_END______";
+  
   public static String getFileIdPath(FileHandle handle) {
     return getFileIdPath(handle.getFileId());
   }
@@ -102,7 +108,10 @@ public class Nfs3Utils {
   /**
    * Send a write response to the netty network socket channel
    */
-  public static void writeChannel(Channel channel, XDR out) {
+  public static void writeChannel(Channel channel, XDR out, int xid) {
+    if (RpcProgramNfs3.LOG.isDebugEnabled()) {
+      RpcProgramNfs3.LOG.debug(WRITE_RPC_END + xid);
+    }
     ChannelBuffer outBuf = XDR.writeMessageTcp(out, true);
     channel.write(outBuf);
   }

+ 8 - 8
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java

@@ -291,7 +291,7 @@ class OpenFileCtx {
         WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
             fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
-        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid), xid);
       } else {
         // Handle repeated write requests(same xid or not).
         // If already replied, send reply again. If not replied, drop the
@@ -313,7 +313,7 @@ class OpenFileCtx {
             WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
                 fileWcc, request.getCount(), request.getStableHow(),
                 Nfs3Constant.WRITE_COMMIT_VERF);
-            Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+            Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid), xid);
           }
           updateLastAccessTime();
           
@@ -367,7 +367,7 @@ class OpenFileCtx {
         WccData fileWcc = new WccData(preOpAttr, postOpAttr);
         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
             fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
-        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid), xid);
         writeCtx.setReplied(true);
       }
 
@@ -392,7 +392,7 @@ class OpenFileCtx {
         WccData fileWcc = new WccData(preOpAttr, postOpAttr);
         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
             fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
-        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid), xid);
         writeCtx.setReplied(true);
       }
 
@@ -418,7 +418,7 @@ class OpenFileCtx {
       }
       
       updateLastAccessTime();
-      Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+      Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid), xid);
     }
   }
   
@@ -707,7 +707,7 @@ class OpenFileCtx {
         WccData fileWcc = new WccData(preOpAttr, latestAttr);
         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
             fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
-        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid), xid);
       }
 
     } catch (IOException e) {
@@ -715,7 +715,7 @@ class OpenFileCtx {
           + offset + " and length " + data.length, e);
       if (!writeCtx.getReplied()) {
         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
-        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid), xid);
         // Keep stream open. Either client retries or SteamMonitor closes it.
       }
 
@@ -753,7 +753,7 @@ class OpenFileCtx {
         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
             fileWcc, 0, writeCtx.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
         Nfs3Utils.writeChannel(writeCtx.getChannel(),
-            response.send(new XDR(), writeCtx.getXid()));
+            response.send(new XDR(), writeCtx.getXid()), writeCtx.getXid());
       }
     }
     

+ 12 - 1
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java

@@ -125,7 +125,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
   public static final FsPermission umask = new FsPermission(
       (short) DEFAULT_UMASK);
   
-  private static final Log LOG = LogFactory.getLog(RpcProgramNfs3.class);
+  static final Log LOG = LogFactory.getLog(RpcProgramNfs3.class);
   private static final int MAX_READ_TRANSFER_SIZE = 64 * 1024;
   private static final int MAX_WRITE_TRANSFER_SIZE = 64 * 1024;
   private static final int MAX_READDIR_TRANSFER_SIZE = 64 * 1024;
@@ -1814,9 +1814,19 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
     } else if (nfsproc3 == NFSPROC3.READLINK) {
       response = readlink(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.READ) {
+      if (LOG.isDebugEnabled()) {
+          LOG.debug(Nfs3Utils.READ_RPC_START + xid);
+      }    
       response = read(xdr, securityHandler, client);
+      if (LOG.isDebugEnabled() && (nfsproc3 == NFSPROC3.READ)) {
+        LOG.debug(Nfs3Utils.READ_RPC_END + xid);
+      }
     } else if (nfsproc3 == NFSPROC3.WRITE) {
+      if (LOG.isDebugEnabled()) {
+          LOG.debug(Nfs3Utils.WRITE_RPC_START + xid);
+      }
       response = write(xdr, channel, xid, securityHandler, client);
+      // Write end debug trace is in Nfs3Utils.writeChannel
     } else if (nfsproc3 == NFSPROC3.CREATE) {
       response = create(xdr, securityHandler, client);
     } else if (nfsproc3 == NFSPROC3.MKDIR) {      
@@ -1853,6 +1863,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
     if (response != null) {
       out = response.send(out, xid);
     }
+
     return out;
   }
   

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java

@@ -118,7 +118,7 @@ public class WriteManager {
     byte[] data = request.getData().array();
     if (data.length < count) {
       WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL);
-      Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+      Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid), xid);
       return;
     }
 
@@ -155,7 +155,7 @@ public class WriteManager {
         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
             fileWcc, count, request.getStableHow(),
             Nfs3Constant.WRITE_COMMIT_VERF);
-        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid), xid);
         return;
       }
 
@@ -182,10 +182,10 @@ public class WriteManager {
         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
             fileWcc, count, request.getStableHow(),
             Nfs3Constant.WRITE_COMMIT_VERF);
-        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid), xid);
       } else {
         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
-        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid));
+        Nfs3Utils.writeChannel(channel, response.send(new XDR(), xid), xid);
       }
     }
 

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java

@@ -174,11 +174,11 @@ public class TestOutOfOrderWrite {
     XDR writeReq;
 
     writeReq = write(handle, 0x8000005c, 2000, 1000, data3);
-    Nfs3Utils.writeChannel(channel, writeReq);
+    Nfs3Utils.writeChannel(channel, writeReq, 1);
     writeReq = write(handle, 0x8000005d, 1000, 1000, data2);
-    Nfs3Utils.writeChannel(channel, writeReq);
+    Nfs3Utils.writeChannel(channel, writeReq, 2);
     writeReq = write(handle, 0x8000005e, 0, 1000, data1);
-    Nfs3Utils.writeChannel(channel, writeReq);
+    Nfs3Utils.writeChannel(channel, writeReq, 3);
 
     // TODO: convert to Junit test, and validate result automatically
   }

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -275,6 +275,10 @@ Release 2.3.0 - UNRELEASED
     HDFS-4096. Add snapshot information to namenode WebUI. (Haohui Mai via 
     jing9)
 
+    HDFS-5188. In BlockPlacementPolicy, reduce the number of chooseTarget(..)
+    methods; replace HashMap with Map in parameter declarations and cleanup
+    some related code.  (szetszwo)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -338,6 +342,8 @@ Release 2.1.1-beta - UNRELEASED
 
     HDFS-5067 Support symlink operations in NFS gateway. (brandonli)
 
+    HDFS-5199 Add more debug trace for NFS READ and WRITE. (brandonli)
+
   IMPROVEMENTS
 
     HDFS-4513. Clarify in the WebHDFS REST API that all JSON respsonses may

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
 
 /** 
  * This class contains constants for configuration keys used
@@ -348,6 +349,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY = "dfs.block.access.token.lifetime";
   public static final long    DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT = 600L;
 
+  public static final String DFS_BLOCK_REPLICATOR_CLASSNAME_KEY = "dfs.block.replicator.classname";
+  public static final Class<BlockPlacementPolicyDefault> DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT = BlockPlacementPolicyDefault.class;
   public static final String  DFS_REPLICATION_MAX_KEY = "dfs.replication.max";
   public static final int     DFS_REPLICATION_MAX_DEFAULT = 512;
   public static final String  DFS_DF_INTERVAL_KEY = "dfs.df.interval";

+ 9 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -1260,8 +1260,7 @@ public class BlockManager {
       namesystem.writeUnlock();
     }
 
-    HashMap<Node, Node> excludedNodes
-        = new HashMap<Node, Node>();
+    final Map<Node, Node> excludedNodes = new HashMap<Node, Node>();
     for(ReplicationWork rw : work){
       // Exclude all of the containing nodes from being targets.
       // This list includes decommissioning or corrupt nodes.
@@ -1273,9 +1272,7 @@ public class BlockManager {
       // choose replication targets: NOT HOLDING THE GLOBAL LOCK
       // It is costly to extract the filename for which chooseTargets is called,
       // so for now we pass in the block collection itself.
-      rw.targets = blockplacement.chooseTarget(rw.bc,
-          rw.additionalReplRequired, rw.srcNode, rw.liveReplicaNodes,
-          excludedNodes, rw.block.getNumBytes());
+      rw.chooseTargets(blockplacement, excludedNodes);
     }
 
     namesystem.writeLock();
@@ -3249,6 +3246,13 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
       this.priority = priority;
       this.targets = null;
     }
+    
+    private void chooseTargets(BlockPlacementPolicy blockplacement,
+        Map<Node, Node> excludedNodes) {
+      targets = blockplacement.chooseTarget(bc.getName(),
+          additionalReplRequired, srcNode, liveReplicaNodes, false,
+          excludedNodes, block.getNumBytes());
+    }
   }
 
   /**

+ 10 - 55
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java

@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -27,6 +26,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -51,25 +51,6 @@ public abstract class BlockPlacementPolicy {
     }
   }
     
-  /**
-   * choose <i>numOfReplicas</i> data nodes for <i>writer</i> 
-   * to re-replicate a block with size <i>blocksize</i> 
-   * If not, return as many as we can.
-   * 
-   * @param srcPath the file to which this chooseTargets is being invoked. 
-   * @param numOfReplicas additional number of replicas wanted.
-   * @param writer the writer's machine, null if not in the cluster.
-   * @param chosenNodes datanodes that have been chosen as targets.
-   * @param blocksize size of the data to be written.
-   * @return array of DatanodeDescriptor instances chosen as target 
-   * and sorted as a pipeline.
-   */
-  abstract DatanodeDescriptor[] chooseTarget(String srcPath,
-                                             int numOfReplicas,
-                                             DatanodeDescriptor writer,
-                                             List<DatanodeDescriptor> chosenNodes,
-                                             long blocksize);
-
   /**
    * choose <i>numOfReplicas</i> data nodes for <i>writer</i> 
    * to re-replicate a block with size <i>blocksize</i> 
@@ -90,34 +71,8 @@ public abstract class BlockPlacementPolicy {
                                              DatanodeDescriptor writer,
                                              List<DatanodeDescriptor> chosenNodes,
                                              boolean returnChosenNodes,
-                                             HashMap<Node, Node> excludedNodes,
+                                             Map<Node, Node> excludedNodes,
                                              long blocksize);
-
-  /**
-   * choose <i>numOfReplicas</i> data nodes for <i>writer</i>
-   * If not, return as many as we can.
-   * The base implemenatation extracts the pathname of the file from the
-   * specified srcBC, but this could be a costly operation depending on the
-   * file system implementation. Concrete implementations of this class should
-   * override this method to avoid this overhead.
-   * 
-   * @param srcBC block collection of file for which chooseTarget is invoked.
-   * @param numOfReplicas additional number of replicas wanted.
-   * @param writer the writer's machine, null if not in the cluster.
-   * @param chosenNodes datanodes that have been chosen as targets.
-   * @param blocksize size of the data to be written.
-   * @return array of DatanodeDescriptor instances chosen as target 
-   * and sorted as a pipeline.
-   */
-  DatanodeDescriptor[] chooseTarget(BlockCollection srcBC,
-                                    int numOfReplicas,
-                                    DatanodeDescriptor writer,
-                                    List<DatanodeDescriptor> chosenNodes,
-                                    HashMap<Node, Node> excludedNodes,
-                                    long blocksize) {
-    return chooseTarget(srcBC.getName(), numOfReplicas, writer,
-                        chosenNodes, false, excludedNodes, blocksize);
-  }
   
   /**
    * Same as {@link #chooseTarget(String, int, DatanodeDescriptor, List, boolean, 
@@ -128,7 +83,7 @@ public abstract class BlockPlacementPolicy {
    */
   DatanodeDescriptor[] chooseTarget(String src,
       int numOfReplicas, DatanodeDescriptor writer,
-      HashMap<Node, Node> excludedNodes,
+      Map<Node, Node> excludedNodes,
       long blocksize, List<DatanodeDescriptor> favoredNodes) {
     // This class does not provide the functionality of placing
     // a block in favored datanodes. The implementations of this class
@@ -183,7 +138,7 @@ public abstract class BlockPlacementPolicy {
     
   /**
    * Get an instance of the configured Block Placement Policy based on the
-   * value of the configuration paramater dfs.block.replicator.classname.
+   * the configuration property {@link DFS_BLOCK_REPLICATOR_CLASSNAME_KEY}.
    * 
    * @param conf the configuration to be used
    * @param stats an object that is used to retrieve the load on the cluster
@@ -193,12 +148,12 @@ public abstract class BlockPlacementPolicy {
   public static BlockPlacementPolicy getInstance(Configuration conf, 
                                                  FSClusterStats stats,
                                                  NetworkTopology clusterMap) {
-    Class<? extends BlockPlacementPolicy> replicatorClass =
-                      conf.getClass("dfs.block.replicator.classname",
-                                    BlockPlacementPolicyDefault.class,
-                                    BlockPlacementPolicy.class);
-    BlockPlacementPolicy replicator = (BlockPlacementPolicy) ReflectionUtils.newInstance(
-                                                             replicatorClass, conf);
+    final Class<? extends BlockPlacementPolicy> replicatorClass = conf.getClass(
+        DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT,
+        BlockPlacementPolicy.class);
+    final BlockPlacementPolicy replicator = ReflectionUtils.newInstance(
+        replicatorClass, conf);
     replicator.initialize(conf, stats, clusterMap);
     return replicator;
   }

+ 120 - 184
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java

@@ -22,8 +22,8 @@ import static org.apache.hadoop.util.Time.now;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
@@ -57,6 +57,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     "For more information, please enable DEBUG log level on "
     + BlockPlacementPolicy.class.getName();
 
+  private static final ThreadLocal<StringBuilder> debugLoggingBuilder
+      = new ThreadLocal<StringBuilder>() {
+        @Override
+        protected StringBuilder initialValue() {
+          return new StringBuilder();
+        }
+      };
+
   protected boolean considerLoad; 
   private boolean preferLocalNode = true;
   protected NetworkTopology clusterMap;
@@ -95,40 +103,25 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
         DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
   }
 
-  protected ThreadLocal<StringBuilder> threadLocalBuilder =
-    new ThreadLocal<StringBuilder>() {
-    @Override
-    protected StringBuilder initialValue() {
-      return new StringBuilder();
-    }
-  };
-
-  @Override
-  public DatanodeDescriptor[] chooseTarget(String srcPath,
-                                    int numOfReplicas,
-                                    DatanodeDescriptor writer,
-                                    List<DatanodeDescriptor> chosenNodes,
-                                    long blocksize) {
-    return chooseTarget(numOfReplicas, writer, chosenNodes, false,
-        null, blocksize);
-  }
-
   @Override
   public DatanodeDescriptor[] chooseTarget(String srcPath,
                                     int numOfReplicas,
                                     DatanodeDescriptor writer,
                                     List<DatanodeDescriptor> chosenNodes,
                                     boolean returnChosenNodes,
-                                    HashMap<Node, Node> excludedNodes,
+                                    Map<Node, Node> excludedNodes,
                                     long blocksize) {
     return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes,
         excludedNodes, blocksize);
   }
 
   @Override
-  DatanodeDescriptor[] chooseTarget(String src, int numOfReplicas,
-      DatanodeDescriptor writer, HashMap<Node, Node> excludedNodes,
-      long blocksize, List<DatanodeDescriptor> favoredNodes) {
+  DatanodeDescriptor[] chooseTarget(String src,
+      int numOfReplicas,
+      DatanodeDescriptor writer,
+      Map<Node, Node> excludedNodes,
+      long blocksize,
+      List<DatanodeDescriptor> favoredNodes) {
     try {
       if (favoredNodes == null || favoredNodes.size() == 0) {
         // Favored nodes not specified, fall back to regular block placement.
@@ -137,7 +130,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
             excludedNodes, blocksize);
       }
 
-      HashMap<Node, Node> favoriteAndExcludedNodes = excludedNodes == null ?
+      Map<Node, Node> favoriteAndExcludedNodes = excludedNodes == null ?
           new HashMap<Node, Node>() : new HashMap<Node, Node>(excludedNodes);
 
       // Choose favored nodes
@@ -181,14 +174,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
   }
 
   /** This is the implementation. */
-  DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+  private DatanodeDescriptor[] chooseTarget(int numOfReplicas,
                                     DatanodeDescriptor writer,
                                     List<DatanodeDescriptor> chosenNodes,
                                     boolean returnChosenNodes,
-                                    HashMap<Node, Node> excludedNodes,
+                                    Map<Node, Node> excludedNodes,
                                     long blocksize) {
     if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
-      return new DatanodeDescriptor[0];
+      return DatanodeDescriptor.EMPTY_ARRAY;
     }
       
     if (excludedNodes == null) {
@@ -204,7 +197,6 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     for (DatanodeDescriptor node:chosenNodes) {
       // add localMachine and related nodes to excludedNodes
       addToExcludedNodes(node, excludedNodes);
-      adjustExcludedNodes(excludedNodes, node);
     }
       
     if (!clusterMap.contains(writer)) {
@@ -239,7 +231,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
   /* choose <i>numOfReplicas</i> from all data nodes */
   private DatanodeDescriptor chooseTarget(int numOfReplicas,
                                           DatanodeDescriptor writer,
-                                          HashMap<Node, Node> excludedNodes,
+                                          Map<Node, Node> excludedNodes,
                                           long blocksize,
                                           int maxNodesPerRack,
                                           List<DatanodeDescriptor> results,
@@ -256,7 +248,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     }
 
     // Keep a copy of original excludedNodes
-    final HashMap<Node, Node> oldExcludedNodes = avoidStaleNodes ? 
+    final Map<Node, Node> oldExcludedNodes = avoidStaleNodes ? 
         new HashMap<Node, Node>(excludedNodes) : null;
     try {
       if (numOfResults == 0) {
@@ -316,19 +308,19 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     return writer;
   }
     
-  /* choose <i>localMachine</i> as the target.
+  /**
+   * Choose <i>localMachine</i> as the target.
    * if <i>localMachine</i> is not available, 
    * choose a node on the same rack
    * @return the chosen node
    */
-  protected DatanodeDescriptor chooseLocalNode(
-                                             DatanodeDescriptor localMachine,
-                                             HashMap<Node, Node> excludedNodes,
+  protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
+                                             Map<Node, Node> excludedNodes,
                                              long blocksize,
                                              int maxNodesPerRack,
                                              List<DatanodeDescriptor> results,
                                              boolean avoidStaleNodes)
-    throws NotEnoughReplicasException {
+      throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null)
       return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
@@ -337,11 +329,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
       // otherwise try local machine first
       Node oldNode = excludedNodes.put(localMachine, localMachine);
       if (oldNode == null) { // was not in the excluded list
-        if (isGoodTarget(localMachine, blocksize, maxNodesPerRack, false,
-            results, avoidStaleNodes)) {
-          results.add(localMachine);
-          // add localMachine and related nodes to excludedNode
-          addToExcludedNodes(localMachine, excludedNodes);
+        if (addIfIsGoodTarget(localMachine, excludedNodes, blocksize,
+            maxNodesPerRack, false, results, avoidStaleNodes) >= 0) {
           return localMachine;
         }
       } 
@@ -358,26 +347,26 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
    * @return number of new excluded nodes
    */
   protected int addToExcludedNodes(DatanodeDescriptor localMachine,
-      HashMap<Node, Node> excludedNodes) {
+      Map<Node, Node> excludedNodes) {
     Node node = excludedNodes.put(localMachine, localMachine);
     return node == null?1:0;
   }
 
-  /* choose one node from the rack that <i>localMachine</i> is on.
+  /**
+   * Choose one node from the rack that <i>localMachine</i> is on.
    * if no such node is available, choose one node from the rack where
    * a second replica is on.
    * if still no such node is available, choose a random node 
    * in the cluster.
    * @return the chosen node
    */
-  protected DatanodeDescriptor chooseLocalRack(
-                                             DatanodeDescriptor localMachine,
-                                             HashMap<Node, Node> excludedNodes,
+  protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
+                                             Map<Node, Node> excludedNodes,
                                              long blocksize,
                                              int maxNodesPerRack,
                                              List<DatanodeDescriptor> results,
                                              boolean avoidStaleNodes)
-    throws NotEnoughReplicasException {
+      throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
@@ -391,9 +380,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
       DatanodeDescriptor newLocal=null;
-      for(Iterator<DatanodeDescriptor> iter=results.iterator();
-          iter.hasNext();) {
-        DatanodeDescriptor nextNode = iter.next();
+      for(DatanodeDescriptor nextNode : results) {
         if (nextNode != localMachine) {
           newLocal = nextNode;
           break;
@@ -416,7 +403,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     }
   }
     
-  /* choose <i>numOfReplicas</i> nodes from the racks 
+  /** 
+   * Choose <i>numOfReplicas</i> nodes from the racks 
    * that <i>localMachine</i> is NOT on.
    * if not enough nodes are available, choose the remaining ones 
    * from the local rack
@@ -424,12 +412,12 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     
   protected void chooseRemoteRack(int numOfReplicas,
                                 DatanodeDescriptor localMachine,
-                                HashMap<Node, Node> excludedNodes,
+                                Map<Node, Node> excludedNodes,
                                 long blocksize,
                                 int maxReplicasPerRack,
                                 List<DatanodeDescriptor> results,
                                 boolean avoidStaleNodes)
-    throws NotEnoughReplicasException {
+                                    throws NotEnoughReplicasException {
     int oldNumOfReplicas = results.size();
     // randomly choose one node from remote racks
     try {
@@ -443,91 +431,59 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     }
   }
 
-  /* Randomly choose one target from <i>nodes</i>.
-   * @return the chosen node
+  /**
+   * Randomly choose one target from the given <i>scope</i>.
+   * @return the chosen node, if there is any.
    */
-  protected DatanodeDescriptor chooseRandom(
-                                          String nodes,
-                                          HashMap<Node, Node> excludedNodes,
-                                          long blocksize,
-                                          int maxNodesPerRack,
-                                          List<DatanodeDescriptor> results,
-                                          boolean avoidStaleNodes) 
-    throws NotEnoughReplicasException {
-    int numOfAvailableNodes =
-      clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
-    StringBuilder builder = null;
-    if (LOG.isDebugEnabled()) {
-      builder = threadLocalBuilder.get();
-      builder.setLength(0);
-      builder.append("[");
-    }
-    boolean badTarget = false;
-    while(numOfAvailableNodes > 0) {
-      DatanodeDescriptor chosenNode = 
-        (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
-
-      Node oldNode = excludedNodes.put(chosenNode, chosenNode);
-      if (oldNode == null) { // chosenNode was not in the excluded list
-        numOfAvailableNodes--;
-        if (isGoodTarget(chosenNode, blocksize, 
-                maxNodesPerRack, results, avoidStaleNodes)) {
-          results.add(chosenNode);
-          // add chosenNode and related nodes to excludedNode
-          addToExcludedNodes(chosenNode, excludedNodes);
-          adjustExcludedNodes(excludedNodes, chosenNode);
-          return chosenNode;
-        } else {
-          badTarget = true;
-        }
-      }
-    }
-
-    String detail = enableDebugLogging;
-    if (LOG.isDebugEnabled()) {
-      if (badTarget && builder != null) {
-        detail = builder.append("]").toString();
-        builder.setLength(0);
-      } else detail = "";
-    }
-    throw new NotEnoughReplicasException(detail);
+  protected DatanodeDescriptor chooseRandom(String scope,
+      Map<Node, Node> excludedNodes,
+      long blocksize,
+      int maxNodesPerRack,
+      List<DatanodeDescriptor> results,
+      boolean avoidStaleNodes)
+          throws NotEnoughReplicasException {
+    return chooseRandom(1, scope, excludedNodes, blocksize, maxNodesPerRack,
+        results, avoidStaleNodes);
   }
-    
-  /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
+
+  /**
+   * Randomly choose <i>numOfReplicas</i> targets from the given <i>scope</i>.
+   * @return the first chosen node, if there is any.
    */
-  protected void chooseRandom(int numOfReplicas,
-                            String nodes,
-                            HashMap<Node, Node> excludedNodes,
+  protected DatanodeDescriptor chooseRandom(int numOfReplicas,
+                            String scope,
+                            Map<Node, Node> excludedNodes,
                             long blocksize,
                             int maxNodesPerRack,
                             List<DatanodeDescriptor> results,
                             boolean avoidStaleNodes)
-    throws NotEnoughReplicasException {
+                                throws NotEnoughReplicasException {
       
-    int numOfAvailableNodes =
-      clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
+    int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes(
+        scope, excludedNodes.keySet());
     StringBuilder builder = null;
     if (LOG.isDebugEnabled()) {
-      builder = threadLocalBuilder.get();
+      builder = debugLoggingBuilder.get();
       builder.setLength(0);
       builder.append("[");
     }
     boolean badTarget = false;
+    DatanodeDescriptor firstChosen = null;
     while(numOfReplicas > 0 && numOfAvailableNodes > 0) {
       DatanodeDescriptor chosenNode = 
-        (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
+          (DatanodeDescriptor)clusterMap.chooseRandom(scope);
       Node oldNode = excludedNodes.put(chosenNode, chosenNode);
       if (oldNode == null) {
         numOfAvailableNodes--;
 
-        if (isGoodTarget(chosenNode, blocksize, 
-              maxNodesPerRack, results, avoidStaleNodes)) {
+        int newExcludedNodes = addIfIsGoodTarget(chosenNode, excludedNodes,
+            blocksize, maxNodesPerRack, considerLoad, results, avoidStaleNodes);
+        if (newExcludedNodes >= 0) {
           numOfReplicas--;
-          results.add(chosenNode);
-          // add chosenNode and related nodes to excludedNode
-          int newExcludedNodes = addToExcludedNodes(chosenNode, excludedNodes);
+          if (firstChosen == null) {
+            firstChosen = chosenNode;
+          }
           numOfAvailableNodes -= newExcludedNodes;
-          adjustExcludedNodes(excludedNodes, chosenNode);
         } else {
           badTarget = true;
         }
@@ -544,34 +500,44 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
       }
       throw new NotEnoughReplicasException(detail);
     }
+    
+    return firstChosen;
   }
-  
+
   /**
-   * After choosing a node to place replica, adjust excluded nodes accordingly.
-   * It should do nothing here as chosenNode is already put into exlcudeNodes, 
-   * but it can be overridden in subclass to put more related nodes into 
-   * excludedNodes.
-   * 
-   * @param excludedNodes
-   * @param chosenNode
+   * If the given node is a good target, add it to the result list and
+   * update the excluded node map.
+   * @return -1 if the given is not a good target;
+   *         otherwise, return the number of excluded nodes added to the map.
    */
-  protected void adjustExcludedNodes(HashMap<Node, Node> excludedNodes,
-      Node chosenNode) {
-    // do nothing here.
+  int addIfIsGoodTarget(DatanodeDescriptor node,
+      Map<Node, Node> excludedNodes,
+      long blockSize,
+      int maxNodesPerRack,
+      boolean considerLoad,
+      List<DatanodeDescriptor> results,                           
+      boolean avoidStaleNodes) {
+    if (isGoodTarget(node, blockSize, maxNodesPerRack, considerLoad,
+        results, avoidStaleNodes)) {
+      results.add(node);
+      // add node and related nodes to excludedNode
+      return addToExcludedNodes(node, excludedNodes);
+    } else { 
+      return -1;
+    }
   }
 
-  /* judge if a node is a good target.
-   * return true if <i>node</i> has enough space, 
-   * does not have too much load, and the rack does not have too many nodes
-   */
-  private boolean isGoodTarget(DatanodeDescriptor node,
-                               long blockSize, int maxTargetPerRack,
-                               List<DatanodeDescriptor> results, 
-                               boolean avoidStaleNodes) {
-    return isGoodTarget(node, blockSize, maxTargetPerRack, this.considerLoad,
-        results, avoidStaleNodes);
+  private static void logNodeIsNotChosen(DatanodeDescriptor node, String reason) {
+    if (LOG.isDebugEnabled()) {
+      // build the error message for later use.
+      debugLoggingBuilder.get()
+          .append(node).append(": ")
+          .append("Node ").append(NodeBase.getPath(node))
+          .append(" is not chosen because ")
+          .append(reason);
+    }
   }
-  
+
   /**
    * Determine if a node is a good target. 
    * 
@@ -588,28 +554,20 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
    *         does not have too much load, 
    *         and the rack does not have too many nodes.
    */
-  protected boolean isGoodTarget(DatanodeDescriptor node,
+  private boolean isGoodTarget(DatanodeDescriptor node,
                                long blockSize, int maxTargetPerRack,
                                boolean considerLoad,
                                List<DatanodeDescriptor> results,                           
                                boolean avoidStaleNodes) {
     // check if the node is (being) decommissed
     if (node.isDecommissionInProgress() || node.isDecommissioned()) {
-      if(LOG.isDebugEnabled()) {
-        threadLocalBuilder.get().append(node.toString()).append(": ")
-          .append("Node ").append(NodeBase.getPath(node))
-          .append(" is not chosen because the node is (being) decommissioned ");
-      }
+      logNodeIsNotChosen(node, "the node is (being) decommissioned ");
       return false;
     }
 
     if (avoidStaleNodes) {
       if (node.isStale(this.staleInterval)) {
-        if (LOG.isDebugEnabled()) {
-          threadLocalBuilder.get().append(node.toString()).append(": ")
-              .append("Node ").append(NodeBase.getPath(node))
-              .append(" is not chosen because the node is stale ");
-        }
+        logNodeIsNotChosen(node, "the node is stale ");
         return false;
       }
     }
@@ -618,11 +576,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
                      (node.getBlocksScheduled() * blockSize); 
     // check the remaining capacity of the target machine
     if (blockSize* HdfsConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
-      if(LOG.isDebugEnabled()) {
-        threadLocalBuilder.get().append(node.toString()).append(": ")
-          .append("Node ").append(NodeBase.getPath(node))
-          .append(" is not chosen because the node does not have enough space ");
-      }
+      logNodeIsNotChosen(node, "the node does not have enough space ");
       return false;
     }
       
@@ -634,11 +588,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
         avgLoad = (double)stats.getTotalLoad()/size;
       }
       if (node.getXceiverCount() > (2.0 * avgLoad)) {
-        if(LOG.isDebugEnabled()) {
-          threadLocalBuilder.get().append(node.toString()).append(": ")
-            .append("Node ").append(NodeBase.getPath(node))
-            .append(" is not chosen because the node is too busy ");
-        }
+        logNodeIsNotChosen(node, "the node is too busy ");
         return false;
       }
     }
@@ -646,31 +596,25 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     // check if the target rack has chosen too many nodes
     String rackname = node.getNetworkLocation();
     int counter=1;
-    for(Iterator<DatanodeDescriptor> iter = results.iterator();
-        iter.hasNext();) {
-      Node result = iter.next();
+    for(Node result : results) {
       if (rackname.equals(result.getNetworkLocation())) {
         counter++;
       }
     }
     if (counter>maxTargetPerRack) {
-      if(LOG.isDebugEnabled()) {
-        threadLocalBuilder.get().append(node.toString()).append(": ")
-          .append("Node ").append(NodeBase.getPath(node))
-          .append(" is not chosen because the rack has too many chosen nodes ");
-      }
+      logNodeIsNotChosen(node, "the rack has too many chosen nodes ");
       return false;
     }
     return true;
   }
     
-  /* Return a pipeline of nodes.
+  /**
+   * Return a pipeline of nodes.
    * The pipeline is formed finding a shortest path that 
    * starts from the writer and traverses all <i>nodes</i>
    * This is basically a traveling salesman problem.
    */
-  private DatanodeDescriptor[] getPipeline(
-                                           DatanodeDescriptor writer,
+  private DatanodeDescriptor[] getPipeline(DatanodeDescriptor writer,
                                            DatanodeDescriptor[] nodes) {
     if (nodes.length==0) return nodes;
       
@@ -709,7 +653,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
                                   int minRacks) {
     DatanodeInfo[] locs = lBlk.getLocations();
     if (locs == null)
-      locs = new DatanodeInfo[0];
+      locs = DatanodeDescriptor.EMPTY_ARRAY;
     int numRacks = clusterMap.getNumOfRacks();
     if(numRacks <= 1) // only one rack
       return 0;
@@ -724,24 +668,18 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
 
   @Override
   public DatanodeDescriptor chooseReplicaToDelete(BlockCollection bc,
-                                                 Block block,
-                                                 short replicationFactor,
-                                                 Collection<DatanodeDescriptor> first, 
-                                                 Collection<DatanodeDescriptor> second) {
+      Block block, short replicationFactor,
+      Collection<DatanodeDescriptor> first,
+      Collection<DatanodeDescriptor> second) {
     long oldestHeartbeat =
       now() - heartbeatInterval * tolerateHeartbeatMultiplier;
     DatanodeDescriptor oldestHeartbeatNode = null;
     long minSpace = Long.MAX_VALUE;
     DatanodeDescriptor minSpaceNode = null;
 
-    // pick replica from the first Set. If first is empty, then pick replicas
-    // from second set.
-    Iterator<DatanodeDescriptor> iter = pickupReplicaSet(first, second);
-
     // Pick the node with the oldest heartbeat or with the least free space,
     // if all hearbeats are within the tolerable heartbeat interval
-    while (iter.hasNext() ) {
-      DatanodeDescriptor node = iter.next();
+    for(DatanodeDescriptor node : pickupReplicaSet(first, second)) {
       long free = node.getRemaining();
       long lastHeartbeat = node.getLastUpdate();
       if(lastHeartbeat < oldestHeartbeat) {
@@ -762,12 +700,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
    * replica while second set contains remaining replica nodes.
    * So pick up first set if not empty. If first is empty, then pick second.
    */
-  protected Iterator<DatanodeDescriptor> pickupReplicaSet(
+  protected Collection<DatanodeDescriptor> pickupReplicaSet(
       Collection<DatanodeDescriptor> first,
       Collection<DatanodeDescriptor> second) {
-    Iterator<DatanodeDescriptor> iter =
-        first.isEmpty() ? second.iterator() : first.iterator();
-    return iter;
+    return first.isEmpty() ? second : first;
   }
   
   @VisibleForTesting

+ 15 - 41
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -65,7 +64,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
    */
   @Override
   protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
-      HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      Map<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
       List<DatanodeDescriptor> results, boolean avoidStaleNodes)
         throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
@@ -76,12 +75,8 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
     // otherwise try local machine first
     Node oldNode = excludedNodes.put(localMachine, localMachine);
     if (oldNode == null) { // was not in the excluded list
-      if (isGoodTarget(localMachine, blocksize,
-          maxNodesPerRack, false, results, avoidStaleNodes)) {
-        results.add(localMachine);
-        // Nodes under same nodegroup should be excluded.
-        addNodeGroupToExcludedNodes(excludedNodes,
-            localMachine.getNetworkLocation());
+      if (addIfIsGoodTarget(localMachine, excludedNodes, blocksize,
+          maxNodesPerRack, false, results, avoidStaleNodes) >= 0) {
         return localMachine;
       }
     } 
@@ -98,26 +93,10 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
         blocksize, maxNodesPerRack, results, avoidStaleNodes);
   }
 
-  @Override
-  protected void adjustExcludedNodes(HashMap<Node, Node> excludedNodes,
-      Node chosenNode) {
-    // as node-group aware implementation, it should make sure no two replica
-    // are placing on the same node group.
-    addNodeGroupToExcludedNodes(excludedNodes, chosenNode.getNetworkLocation());
-  }
   
-  // add all nodes under specific nodegroup to excludedNodes.
-  private void addNodeGroupToExcludedNodes(HashMap<Node, Node> excludedNodes,
-      String nodeGroup) {
-    List<Node> leafNodes = clusterMap.getLeaves(nodeGroup);
-    for (Node node : leafNodes) {
-      excludedNodes.put(node, node);
-    }
-  }
-
   @Override
   protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
-      HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      Map<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
       List<DatanodeDescriptor> results, boolean avoidStaleNodes)
       throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
@@ -137,9 +116,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
       DatanodeDescriptor newLocal=null;
-      for(Iterator<DatanodeDescriptor> iter=results.iterator();
-          iter.hasNext();) {
-        DatanodeDescriptor nextNode = iter.next();
+      for(DatanodeDescriptor nextNode : results) {
         if (nextNode != localMachine) {
           newLocal = nextNode;
           break;
@@ -165,7 +142,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
 
   @Override
   protected void chooseRemoteRack(int numOfReplicas,
-      DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes,
+      DatanodeDescriptor localMachine, Map<Node, Node> excludedNodes,
       long blocksize, int maxReplicasPerRack, List<DatanodeDescriptor> results,
       boolean avoidStaleNodes) throws NotEnoughReplicasException {
     int oldNumOfReplicas = results.size();
@@ -192,7 +169,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
    */
   private DatanodeDescriptor chooseLocalNodeGroup(
       NetworkTopologyWithNodeGroup clusterMap, DatanodeDescriptor localMachine,
-      HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      Map<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
       List<DatanodeDescriptor> results, boolean avoidStaleNodes)
       throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
@@ -209,9 +186,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
       DatanodeDescriptor newLocal=null;
-      for(Iterator<DatanodeDescriptor> iter=results.iterator();
-        iter.hasNext();) {
-        DatanodeDescriptor nextNode = iter.next();
+      for(DatanodeDescriptor nextNode : results) {
         if (nextNode != localMachine) {
           newLocal = nextNode;
           break;
@@ -248,10 +223,11 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
    * within the same nodegroup
    * @return number of new excluded nodes
    */
-  protected int addToExcludedNodes(DatanodeDescriptor localMachine,
-      HashMap<Node, Node> excludedNodes) {
+  @Override
+  protected int addToExcludedNodes(DatanodeDescriptor chosenNode,
+      Map<Node, Node> excludedNodes) {
     int countOfExcludedNodes = 0;
-    String nodeGroupScope = localMachine.getNetworkLocation();
+    String nodeGroupScope = chosenNode.getNetworkLocation();
     List<Node> leafNodes = clusterMap.getLeaves(nodeGroupScope);
     for (Node leafNode : leafNodes) {
       Node node = excludedNodes.put(leafNode, leafNode);
@@ -274,12 +250,12 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
    * If first is empty, then pick second.
    */
   @Override
-  public Iterator<DatanodeDescriptor> pickupReplicaSet(
+  public Collection<DatanodeDescriptor> pickupReplicaSet(
       Collection<DatanodeDescriptor> first,
       Collection<DatanodeDescriptor> second) {
     // If no replica within same rack, return directly.
     if (first.isEmpty()) {
-      return second.iterator();
+      return second;
     }
     // Split data nodes in the first set into two sets, 
     // moreThanOne contains nodes on nodegroup with more than one replica
@@ -312,9 +288,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
       }
     }
     
-    Iterator<DatanodeDescriptor> iter =
-        moreThanOne.isEmpty() ? exactlyOne.iterator() : moreThanOne.iterator();
-    return iter;
+    return moreThanOne.isEmpty()? exactlyOne : moreThanOne;
   }
   
 }

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java

@@ -43,7 +43,8 @@ import org.apache.hadoop.util.Time;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class DatanodeDescriptor extends DatanodeInfo {
-  
+  public static final DatanodeDescriptor[] EMPTY_ARRAY = {};
+
   // Stores status of decommissioning.
   // If node is not decommissioning, do not use this object for anything.
   public DecommissioningStatus decommissioningStatus = new DecommissioningStatus();

+ 7 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java

@@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -39,9 +40,11 @@ import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
+import org.junit.Assert;
 import org.junit.Test;
-import junit.framework.Assert;
 
 /**
  * This class tests if a balancer schedules tasks correctly.
@@ -75,10 +78,9 @@ public class TestBalancerWithNodeGroup {
     Configuration conf = new HdfsConfiguration();
     TestBalancer.initConf(conf);
     conf.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY, 
-        "org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
-    conf.set("dfs.block.replicator.classname", 
-        "org.apache.hadoop.hdfs.server.blockmanagement." +
-        "BlockPlacementPolicyWithNodeGroup");
+        NetworkTopologyWithNodeGroup.class.getName());
+    conf.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, 
+        BlockPlacementPolicyWithNodeGroup.class.getName());
     return conf;
   }
 

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRBWBlockInvalidation.java

@@ -157,8 +157,8 @@ public class TestRBWBlockInvalidation {
     // in the context of the test, whereas a random one is more accurate
     // to what is seen in real clusters (nodes have random amounts of free
     // space)
-    conf.setClass("dfs.block.replicator.classname", RandomDeleterPolicy.class,
-        BlockPlacementPolicy.class); 
+    conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        RandomDeleterPolicy.class, BlockPlacementPolicy.class); 
 
     // Speed up the test a bit with faster heartbeats.
     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);

+ 76 - 100
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java

@@ -138,30 +138,25 @@ public class TestReplicationPolicy {
         HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded
 
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(filename, 0, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(0);
     assertEquals(targets.length, 0);
     
-    targets = replicator.chooseTarget(filename, 1, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(1);
     assertEquals(targets.length, 1);
     assertEquals(targets[0], dataNodes[0]);
     
-    targets = replicator.chooseTarget(filename,
-                                      2, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(2);
     assertEquals(targets.length, 2);
     assertEquals(targets[0], dataNodes[0]);
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     
-    targets = replicator.chooseTarget(filename, 3, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(3);
     assertEquals(targets.length, 3);
     assertEquals(targets[0], dataNodes[0]);
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
 
-    targets = replicator.chooseTarget(filename, 4, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(4);
     assertEquals(targets.length, 4);
     assertEquals(targets[0], dataNodes[0]);
     assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
@@ -173,15 +168,38 @@ public class TestReplicationPolicy {
         HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); 
   }
 
+  private static DatanodeDescriptor[] chooseTarget(int numOfReplicas) {
+    return chooseTarget(numOfReplicas, dataNodes[0]);
+  }
+
+  private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+      DatanodeDescriptor writer) {
+    return chooseTarget(numOfReplicas, writer,
+        new ArrayList<DatanodeDescriptor>());
+  }
+
+  private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+      List<DatanodeDescriptor> chosenNodes) {
+    return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes);
+  }
+
+  private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+      DatanodeDescriptor writer, List<DatanodeDescriptor> chosenNodes) {
+    return chooseTarget(numOfReplicas, writer, chosenNodes, null);
+  }
+
+  private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+      List<DatanodeDescriptor> chosenNodes, Map<Node, Node> excludedNodes) {
+    return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes, excludedNodes);
+  }
+
   private static DatanodeDescriptor[] chooseTarget(
-      BlockPlacementPolicyDefault policy,
       int numOfReplicas,
       DatanodeDescriptor writer,
       List<DatanodeDescriptor> chosenNodes,
-      HashMap<Node, Node> excludedNodes,
-      long blocksize) {
-    return policy.chooseTarget(numOfReplicas, writer, chosenNodes, false,
-        excludedNodes, blocksize);
+      Map<Node, Node> excludedNodes) {
+    return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes,
+        false, excludedNodes, BLOCK_SIZE);
   }
 
   /**
@@ -196,28 +214,24 @@ public class TestReplicationPolicy {
   public void testChooseTarget2() throws Exception { 
     HashMap<Node, Node> excludedNodes;
     DatanodeDescriptor[] targets;
-    BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
     List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
     
     excludedNodes = new HashMap<Node, Node>();
     excludedNodes.put(dataNodes[1], dataNodes[1]); 
-    targets = chooseTarget(repl, 0, dataNodes[0], chosenNodes, excludedNodes,
-        BLOCK_SIZE);
+    targets = chooseTarget(0, chosenNodes, excludedNodes);
     assertEquals(targets.length, 0);
     
     excludedNodes.clear();
     chosenNodes.clear();
     excludedNodes.put(dataNodes[1], dataNodes[1]); 
-    targets = chooseTarget(repl, 1, dataNodes[0], chosenNodes, excludedNodes,
-        BLOCK_SIZE);
+    targets = chooseTarget(1, chosenNodes, excludedNodes);
     assertEquals(targets.length, 1);
     assertEquals(targets[0], dataNodes[0]);
     
     excludedNodes.clear();
     chosenNodes.clear();
     excludedNodes.put(dataNodes[1], dataNodes[1]); 
-    targets = chooseTarget(repl, 2, dataNodes[0], chosenNodes, excludedNodes,
-        BLOCK_SIZE);
+    targets = chooseTarget(2, chosenNodes, excludedNodes);
     assertEquals(targets.length, 2);
     assertEquals(targets[0], dataNodes[0]);
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
@@ -225,8 +239,7 @@ public class TestReplicationPolicy {
     excludedNodes.clear();
     chosenNodes.clear();
     excludedNodes.put(dataNodes[1], dataNodes[1]); 
-    targets = chooseTarget(repl, 3, dataNodes[0], chosenNodes, excludedNodes,
-        BLOCK_SIZE);
+    targets = chooseTarget(3, chosenNodes, excludedNodes);
     assertEquals(targets.length, 3);
     assertEquals(targets[0], dataNodes[0]);
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
@@ -235,8 +248,7 @@ public class TestReplicationPolicy {
     excludedNodes.clear();
     chosenNodes.clear();
     excludedNodes.put(dataNodes[1], dataNodes[1]); 
-    targets = chooseTarget(repl, 4, dataNodes[0], chosenNodes, excludedNodes,
-        BLOCK_SIZE);
+    targets = chooseTarget(4, chosenNodes, excludedNodes);
     assertEquals(targets.length, 4);
     assertEquals(targets[0], dataNodes[0]);
     for(int i=1; i<4; i++) {
@@ -250,7 +262,7 @@ public class TestReplicationPolicy {
     chosenNodes.clear();
     excludedNodes.put(dataNodes[1], dataNodes[1]); 
     chosenNodes.add(dataNodes[2]);
-    targets = repl.chooseTarget(1, dataNodes[0], chosenNodes, true,
+    targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true,
         excludedNodes, BLOCK_SIZE);
     System.out.println("targets=" + Arrays.asList(targets));
     assertEquals(2, targets.length);
@@ -276,30 +288,25 @@ public class TestReplicationPolicy {
         (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space
         
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(filename, 0, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(0);
     assertEquals(targets.length, 0);
     
-    targets = replicator.chooseTarget(filename, 1, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(1);
     assertEquals(targets.length, 1);
     assertEquals(targets[0], dataNodes[1]);
     
-    targets = replicator.chooseTarget(filename, 2, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(2);
     assertEquals(targets.length, 2);
     assertEquals(targets[0], dataNodes[1]);
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     
-    targets = replicator.chooseTarget(filename, 3, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(3);
     assertEquals(targets.length, 3);
     assertEquals(targets[0], dataNodes[1]);
     assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     
-    targets = replicator.chooseTarget(filename, 4, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(4);
     assertEquals(targets.length, 4);
     assertEquals(targets[0], dataNodes[1]);
     for(int i=1; i<4; i++) {
@@ -332,23 +339,19 @@ public class TestReplicationPolicy {
     }
       
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(filename, 0, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(0);
     assertEquals(targets.length, 0);
     
-    targets = replicator.chooseTarget(filename, 1, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(1);
     assertEquals(targets.length, 1);
     assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
     
-    targets = replicator.chooseTarget(filename, 2, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(2);
     assertEquals(targets.length, 2);
     assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     
-    targets = replicator.chooseTarget(filename, 3, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(3);
     assertEquals(targets.length, 3);
     for(int i=0; i<3; i++) {
       assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0]));
@@ -377,21 +380,17 @@ public class TestReplicationPolicy {
       DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r4");
 
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(filename, 0, writerDesc,
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(0, writerDesc);
     assertEquals(targets.length, 0);
 
-    targets = replicator.chooseTarget(filename, 1, writerDesc,
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(1, writerDesc);
     assertEquals(targets.length, 1);
 
-    targets = replicator.chooseTarget(filename, 2, writerDesc,
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(2, writerDesc);
     assertEquals(targets.length, 2);
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
 
-    targets = replicator.chooseTarget(filename, 3, writerDesc,
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(3, writerDesc);
     assertEquals(targets.length, 3);
     assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
@@ -435,9 +434,7 @@ public class TestReplicationPolicy {
     
     // try to choose NUM_OF_DATANODES which is more than actually available
     // nodes.
-    DatanodeDescriptor[] targets = replicator.chooseTarget(filename, 
-        NUM_OF_DATANODES, dataNodes[0], new ArrayList<DatanodeDescriptor>(),
-        BLOCK_SIZE);
+    DatanodeDescriptor[] targets = chooseTarget(NUM_OF_DATANODES);
     assertEquals(targets.length, NUM_OF_DATANODES - 2);
 
     final List<LoggingEvent> log = appender.getLog();
@@ -480,17 +477,14 @@ public class TestReplicationPolicy {
     DatanodeDescriptor[] targets;
     // We set the datanode[0] as stale, thus should choose datanode[1] since
     // datanode[1] is on the same rack with datanode[0] (writer)
-    targets = replicator.chooseTarget(filename, 1, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(1);
     assertEquals(targets.length, 1);
     assertEquals(targets[0], dataNodes[1]);
 
     HashMap<Node, Node> excludedNodes = new HashMap<Node, Node>();
     excludedNodes.put(dataNodes[1], dataNodes[1]);
     List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
-    BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
-    targets = chooseTarget(repl, 1, dataNodes[0], chosenNodes, excludedNodes,
-        BLOCK_SIZE);
+    targets = chooseTarget(1, chosenNodes, excludedNodes);
     assertEquals(targets.length, 1);
     assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
     
@@ -517,33 +511,27 @@ public class TestReplicationPolicy {
     namenode.getNamesystem().getBlockManager()
       .getDatanodeManager().getHeartbeatManager().heartbeatCheck();
 
-    DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(filename, 0, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    DatanodeDescriptor[] targets = chooseTarget(0);
     assertEquals(targets.length, 0);
 
     // Since we have 6 datanodes total, stale nodes should
     // not be returned until we ask for more than 3 targets
-    targets = replicator.chooseTarget(filename, 1, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(1);
     assertEquals(targets.length, 1);
     assertFalse(containsWithinRange(targets[0], dataNodes, 0, 2));
 
-    targets = replicator.chooseTarget(filename, 2, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(2);
     assertEquals(targets.length, 2);
     assertFalse(containsWithinRange(targets[0], dataNodes, 0, 2));
     assertFalse(containsWithinRange(targets[1], dataNodes, 0, 2));
 
-    targets = replicator.chooseTarget(filename, 3, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(3);
     assertEquals(targets.length, 3);
     assertTrue(containsWithinRange(targets[0], dataNodes, 3, 5));
     assertTrue(containsWithinRange(targets[1], dataNodes, 3, 5));
     assertTrue(containsWithinRange(targets[2], dataNodes, 3, 5));
 
-    targets = replicator.chooseTarget(filename, 4, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(4);
     assertEquals(targets.length, 4);
     assertTrue(containsWithinRange(dataNodes[3], targets, 0, 3));
     assertTrue(containsWithinRange(dataNodes[4], targets, 0, 3));
@@ -596,7 +584,8 @@ public class TestReplicationPolicy {
       BlockPlacementPolicy replicator = miniCluster.getNameNode()
           .getNamesystem().getBlockManager().getBlockPlacementPolicy();
       DatanodeDescriptor[] targets = replicator.chooseTarget(filename, 3,
-          staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+          staleNodeInfo, new ArrayList<DatanodeDescriptor>(), false, null, BLOCK_SIZE);
+
       assertEquals(targets.length, 3);
       assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
       
@@ -620,7 +609,7 @@ public class TestReplicationPolicy {
           .getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
       // Call chooseTarget
       targets = replicator.chooseTarget(filename, 3,
-          staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+          staleNodeInfo, new ArrayList<DatanodeDescriptor>(), false, null, BLOCK_SIZE);
       assertEquals(targets.length, 3);
       assertTrue(cluster.isOnSameRack(targets[0], staleNodeInfo));
       
@@ -642,8 +631,7 @@ public class TestReplicationPolicy {
       assertTrue(miniCluster.getNameNode().getNamesystem().getBlockManager()
           .getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
       // Call chooseTarget
-      targets = replicator.chooseTarget(filename, 3,
-          staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+      targets = chooseTarget(3, staleNodeInfo);
       assertEquals(targets.length, 3);
       assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
     } finally {
@@ -664,23 +652,19 @@ public class TestReplicationPolicy {
     chosenNodes.add(dataNodes[0]);    
     DatanodeDescriptor[] targets;
     
-    targets = replicator.chooseTarget(filename,
-                                      0, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(0, chosenNodes);
     assertEquals(targets.length, 0);
     
-    targets = replicator.chooseTarget(filename,
-                                      1, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(1, chosenNodes);
     assertEquals(targets.length, 1);
     assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
     
-    targets = replicator.chooseTarget(filename,
-                                      2, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(2, chosenNodes);
     assertEquals(targets.length, 2);
     assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     
-    targets = replicator.chooseTarget(filename,
-                                      3, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(3, chosenNodes);
     assertEquals(targets.length, 3);
     assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
     assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
@@ -700,17 +684,14 @@ public class TestReplicationPolicy {
     chosenNodes.add(dataNodes[1]);
 
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(filename,
-                                      0, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(0, chosenNodes);
     assertEquals(targets.length, 0);
     
-    targets = replicator.chooseTarget(filename,
-                                      1, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(1, chosenNodes);
     assertEquals(targets.length, 1);
     assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
     
-    targets = replicator.chooseTarget(filename,
-                                      2, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(2, chosenNodes);
     assertEquals(targets.length, 2);
     assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
     assertFalse(cluster.isOnSameRack(dataNodes[0], targets[1]));
@@ -730,29 +711,24 @@ public class TestReplicationPolicy {
     chosenNodes.add(dataNodes[2]);
     
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(filename,
-                                      0, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(0, chosenNodes);
     assertEquals(targets.length, 0);
     
-    targets = replicator.chooseTarget(filename,
-                                      1, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(1, chosenNodes);
     assertEquals(targets.length, 1);
     assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
     assertFalse(cluster.isOnSameRack(dataNodes[2], targets[0]));
     
-    targets = replicator.chooseTarget(filename,
-                               1, dataNodes[2], chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(1, dataNodes[2], chosenNodes);
     assertEquals(targets.length, 1);
     assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
     assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
 
-    targets = replicator.chooseTarget(filename,
-                                      2, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(2, chosenNodes);
     assertEquals(targets.length, 2);
     assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
     
-    targets = replicator.chooseTarget(filename,
-                               2, dataNodes[2], chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(2, dataNodes[2], chosenNodes);
     assertEquals(targets.length, 2);
     assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
   }

+ 73 - 79
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.test.PathUtils;
 import org.junit.After;
@@ -101,10 +102,10 @@ public class TestReplicationPolicyWithNodeGroup {
     FileSystem.setDefaultUri(CONF, "hdfs://localhost:0");
     CONF.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
     // Set properties to make HDFS aware of NodeGroup.
-    CONF.set("dfs.block.replicator.classname", 
-        "org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup");
+    CONF.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, 
+        BlockPlacementPolicyWithNodeGroup.class.getName());
     CONF.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY, 
-        "org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
+        NetworkTopologyWithNodeGroup.class.getName());
     
     File baseDir = PathUtils.getTestDir(TestReplicationPolicyWithNodeGroup.class);
     
@@ -156,6 +157,35 @@ public class TestReplicationPolicyWithNodeGroup {
     return true;
   }
   
+  private DatanodeDescriptor[] chooseTarget(int numOfReplicas) {
+    return chooseTarget(numOfReplicas, dataNodes[0]);
+  }
+
+  private DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+      DatanodeDescriptor writer) {
+    return chooseTarget(numOfReplicas, writer,
+        new ArrayList<DatanodeDescriptor>());
+  }
+
+  private DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+      List<DatanodeDescriptor> chosenNodes) {
+    return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes);
+  }
+
+  private DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+      DatanodeDescriptor writer, List<DatanodeDescriptor> chosenNodes) {
+    return chooseTarget(numOfReplicas, writer, chosenNodes, null);
+  }
+
+  private DatanodeDescriptor[] chooseTarget(
+      int numOfReplicas,
+      DatanodeDescriptor writer,
+      List<DatanodeDescriptor> chosenNodes,
+      Map<Node, Node> excludedNodes) {
+    return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes,
+        false, excludedNodes, BLOCK_SIZE);
+  }
+
   /**
    * In this testcase, client is dataNodes[0]. So the 1st replica should be
    * placed on dataNodes[0], the 2nd replica should be placed on 
@@ -172,31 +202,26 @@ public class TestReplicationPolicyWithNodeGroup {
         HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded
 
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(filename, 0, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(0);
     assertEquals(targets.length, 0);
 
-    targets = replicator.chooseTarget(filename, 1, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(1);
     assertEquals(targets.length, 1);
     assertEquals(targets[0], dataNodes[0]);
 
-    targets = replicator.chooseTarget(filename, 2, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(2);
     assertEquals(targets.length, 2);
     assertEquals(targets[0], dataNodes[0]);
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
 
-    targets = replicator.chooseTarget(filename, 3, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(3);
     assertEquals(targets.length, 3);
     assertEquals(targets[0], dataNodes[0]);
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
     assertFalse(cluster.isOnSameNodeGroup(targets[1], targets[2]));
 
-    targets = replicator.chooseTarget(filename, 4, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(4);
     assertEquals(targets.length, 4);
     assertEquals(targets[0], dataNodes[0]);
     assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
@@ -235,7 +260,7 @@ public class TestReplicationPolicyWithNodeGroup {
 
     excludedNodes = new HashMap<Node, Node>();
     excludedNodes.put(dataNodes[1], dataNodes[1]); 
-    targets = repl.chooseTarget(4, dataNodes[0], chosenNodes, false, 
+    targets = repl.chooseTarget(filename, 4, dataNodes[0], chosenNodes, false, 
         excludedNodes, BLOCK_SIZE);
     assertEquals(targets.length, 4);
     assertEquals(targets[0], dataNodes[0]);
@@ -252,7 +277,7 @@ public class TestReplicationPolicyWithNodeGroup {
     chosenNodes.clear();
     excludedNodes.put(dataNodes[1], dataNodes[1]); 
     chosenNodes.add(dataNodes[2]);
-    targets = repl.chooseTarget(1, dataNodes[0], chosenNodes, true,
+    targets = repl.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true,
         excludedNodes, BLOCK_SIZE);
     System.out.println("targets=" + Arrays.asList(targets));
     assertEquals(2, targets.length);
@@ -278,30 +303,25 @@ public class TestReplicationPolicyWithNodeGroup {
         (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space
 
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(filename, 0, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(0);
     assertEquals(targets.length, 0);
 
-    targets = replicator.chooseTarget(filename, 1, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(1);
     assertEquals(targets.length, 1);
     assertEquals(targets[0], dataNodes[1]);
 
-    targets = replicator.chooseTarget(filename, 2, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(2);
     assertEquals(targets.length, 2);
     assertEquals(targets[0], dataNodes[1]);
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
 
-    targets = replicator.chooseTarget(filename, 3, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(3);
     assertEquals(targets.length, 3);
     assertEquals(targets[0], dataNodes[1]);
     assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
 
-    targets = replicator.chooseTarget(filename, 4, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(4);
     assertEquals(targets.length, 4);
     assertEquals(targets[0], dataNodes[1]);
     assertTrue(cluster.isNodeGroupAware());
@@ -333,23 +353,19 @@ public class TestReplicationPolicyWithNodeGroup {
     }
 
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(filename, 0, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(0);
     assertEquals(targets.length, 0);
 
-    targets = replicator.chooseTarget(filename, 1, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(1);
     assertEquals(targets.length, 1);
     assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
 
-    targets = replicator.chooseTarget(filename, 2, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(2);
     assertEquals(targets.length, 2);
     assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
 
-    targets = replicator.chooseTarget(filename, 3, dataNodes[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(3);
     assertEquals(targets.length, 3);
     for(int i=0; i<3; i++) {
       assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0]));
@@ -371,21 +387,17 @@ public class TestReplicationPolicyWithNodeGroup {
   public void testChooseTarget5() throws Exception {
     setupDataNodeCapacity();
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(filename, 0, NODE,
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(0, NODE);
     assertEquals(targets.length, 0);
 
-    targets = replicator.chooseTarget(filename, 1, NODE,
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(1, NODE);
     assertEquals(targets.length, 1);
 
-    targets = replicator.chooseTarget(filename, 2, NODE,
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(2, NODE);
     assertEquals(targets.length, 2);
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
 
-    targets = replicator.chooseTarget(filename, 3, NODE,
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(3, NODE);
     assertEquals(targets.length, 3);
     assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
@@ -406,23 +418,19 @@ public class TestReplicationPolicyWithNodeGroup {
     chosenNodes.add(dataNodes[0]);
     DatanodeDescriptor[] targets;
     
-    targets = replicator.chooseTarget(filename,
-                                      0, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(0, chosenNodes);
     assertEquals(targets.length, 0);
     
-    targets = replicator.chooseTarget(filename,
-                                      1, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(1, chosenNodes);
     assertEquals(targets.length, 1);
     assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
     
-    targets = replicator.chooseTarget(filename,
-                                      2, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(2, chosenNodes);
     assertEquals(targets.length, 2);
     assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     
-    targets = replicator.chooseTarget(filename,
-                                      3, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(3, chosenNodes);
     assertEquals(targets.length, 3);
     assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
     assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0]));
@@ -444,17 +452,14 @@ public class TestReplicationPolicyWithNodeGroup {
     chosenNodes.add(dataNodes[1]);
 
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(filename,
-                                      0, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(0, chosenNodes);
     assertEquals(targets.length, 0);
 
-    targets = replicator.chooseTarget(filename,
-                                      1, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(1, chosenNodes);
     assertEquals(targets.length, 1);
     assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
 
-    targets = replicator.chooseTarget(filename,
-                                      2, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(2, chosenNodes);
     assertEquals(targets.length, 2);
     assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]) && 
         cluster.isOnSameRack(dataNodes[0], targets[1]));
@@ -475,30 +480,26 @@ public class TestReplicationPolicyWithNodeGroup {
     chosenNodes.add(dataNodes[3]);
 
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(filename,
-                                      0, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(0, chosenNodes);
     assertEquals(targets.length, 0);
 
-    targets = replicator.chooseTarget(filename,
-                                      1, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(1, chosenNodes);
     assertEquals(targets.length, 1);
     assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
     assertFalse(cluster.isOnSameRack(dataNodes[3], targets[0]));
 
-    targets = replicator.chooseTarget(filename,
-                               1, dataNodes[3], chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(1, dataNodes[3], chosenNodes);
     assertEquals(targets.length, 1);
     assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0]));
     assertFalse(cluster.isOnSameNodeGroup(dataNodes[3], targets[0]));
     assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
 
-    targets = replicator.chooseTarget(filename,
-                                      2, dataNodes[0], chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(2, chosenNodes);
     assertEquals(targets.length, 2);
     assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
     assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0]));
-    targets = replicator.chooseTarget(filename,
-                               2, dataNodes[3], chosenNodes, BLOCK_SIZE);
+
+    targets = chooseTarget(2, dataNodes[3], chosenNodes);
     assertEquals(targets.length, 2);
     assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0]));
   }
@@ -584,21 +585,17 @@ public class TestReplicationPolicyWithNodeGroup {
     }
 
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(filename, 0, dataNodesInBoundaryCase[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(0, dataNodesInBoundaryCase[0]);
     assertEquals(targets.length, 0);
     
-    targets = replicator.chooseTarget(filename, 1, dataNodesInBoundaryCase[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(1, dataNodesInBoundaryCase[0]);
     assertEquals(targets.length, 1);
 
-    targets = replicator.chooseTarget(filename, 2, dataNodesInBoundaryCase[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(2, dataNodesInBoundaryCase[0]);
     assertEquals(targets.length, 2);
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     
-    targets = replicator.chooseTarget(filename, 3, dataNodesInBoundaryCase[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(3, dataNodesInBoundaryCase[0]);
     assertEquals(targets.length, 3);
     assertTrue(checkTargetsOnDifferentNodeGroup(targets));
   }
@@ -621,8 +618,7 @@ public class TestReplicationPolicyWithNodeGroup {
     chosenNodes.add(dataNodesInBoundaryCase[0]);
     chosenNodes.add(dataNodesInBoundaryCase[5]);
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(filename, 1, dataNodesInBoundaryCase[0],
-        chosenNodes, BLOCK_SIZE);
+    targets = chooseTarget(1, dataNodesInBoundaryCase[0], chosenNodes);
     assertFalse(cluster.isOnSameNodeGroup(targets[0], 
         dataNodesInBoundaryCase[0]));
     assertFalse(cluster.isOnSameNodeGroup(targets[0],
@@ -661,14 +657,12 @@ public class TestReplicationPolicyWithNodeGroup {
 
     DatanodeDescriptor[] targets;
     // Test normal case -- 3 replicas
-    targets = replicator.chooseTarget(filename, 3, dataNodesInMoreTargetsCase[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(3, dataNodesInMoreTargetsCase[0]);
     assertEquals(targets.length, 3);
     assertTrue(checkTargetsOnDifferentNodeGroup(targets));
 
     // Test special case -- replica number over node groups.
-    targets = replicator.chooseTarget(filename, 10, dataNodesInMoreTargetsCase[0],
-        new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
+    targets = chooseTarget(10, dataNodesInMoreTargetsCase[0]);
     assertTrue(checkTargetsOnDifferentNodeGroup(targets));
     // Verify it only can find 6 targets for placing replicas.
     assertEquals(targets.length, 6);

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java

@@ -96,8 +96,8 @@ public class TestDNFencing {
     // Increase max streams so that we re-replicate quickly.
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 1000);
     // See RandomDeleterPolicy javadoc.
-    conf.setClass("dfs.block.replicator.classname", RandomDeleterPolicy.class,
-        BlockPlacementPolicy.class); 
+    conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        RandomDeleterPolicy.class, BlockPlacementPolicy.class); 
     conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
     cluster = new MiniDFSCluster.Builder(conf)
       .nnTopology(MiniDFSNNTopology.simpleHATopology())

+ 12 - 10
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java

@@ -45,6 +45,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -57,6 +58,8 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 
+import com.sun.research.ws.wadl.Response;
+
 /**
  * Registers/unregisters to RM and sends heartbeats to RM.
  */
@@ -194,7 +197,15 @@ public abstract class RMCommunicator extends AbstractService
       FinishApplicationMasterRequest request =
           FinishApplicationMasterRequest.newInstance(finishState,
             sb.toString(), historyUrl);
-      scheduler.finishApplicationMaster(request);
+      while (true) {
+        FinishApplicationMasterResponse response =
+            scheduler.finishApplicationMaster(request);
+        if (response.getIsUnregistered()) {
+          break;
+        }
+        LOG.info("Waiting for application to be successfully unregistered.");
+        Thread.sleep(rmPollInterval);
+      }
     } catch(Exception are) {
       LOG.error("Exception while unregistering ", are);
     }
@@ -237,15 +248,6 @@ public abstract class RMCommunicator extends AbstractService
             } catch (YarnRuntimeException e) {
               LOG.error("Error communicating with RM: " + e.getMessage() , e);
               return;
-            } catch (InvalidToken e) {
-              // This can happen if the RM has been restarted, since currently
-              // when RM restarts AMRMToken is not populated back to
-              // AMRMTokenSecretManager yet. Once this is fixed, no need
-              // to send JOB_AM_REBOOT event in this method any more.
-              eventHandler.handle(new JobEvent(job.getID(),
-                JobEventType.JOB_AM_REBOOT));
-              LOG.error("Error in authencating with RM: " ,e);
-              return;
             } catch (Exception e) {
               LOG.error("ERROR IN CONTACTING RM. ", e);
               continue;

+ 9 - 6
hadoop-project/pom.xml

@@ -59,6 +59,9 @@
     <hadoop.common.build.dir>${basedir}/../../hadoop-common-project/hadoop-common/target</hadoop.common.build.dir>
     <java.security.egd>file:///dev/urandom</java.security.egd>
 
+    <!-- jersey version -->
+    <jersey.version>1.9</jersey.version>
+
     <!-- ProtocolBuffer version, used to verify the protoc version and -->
     <!-- define the protobuf JAR version                               -->
     <protobuf.version>2.5.0</protobuf.version>
@@ -365,12 +368,12 @@
       <dependency>
         <groupId>com.sun.jersey</groupId>
         <artifactId>jersey-core</artifactId>
-        <version>1.8</version>
+        <version>${jersey.version}</version>
       </dependency>
       <dependency>
         <groupId>com.sun.jersey</groupId>
         <artifactId>jersey-json</artifactId>
-        <version>1.8</version>
+        <version>${jersey.version}</version>
         <exclusions>
           <exclusion>
             <groupId>javax.xml.stream</groupId>
@@ -381,7 +384,7 @@
       <dependency>
         <groupId>com.sun.jersey</groupId>
         <artifactId>jersey-server</artifactId>
-        <version>1.8</version>
+        <version>${jersey.version}</version>
       </dependency>
 
       <dependency>
@@ -399,19 +402,19 @@
       <dependency>
         <groupId>com.sun.jersey.contribs</groupId>
         <artifactId>jersey-guice</artifactId>
-        <version>1.8</version>
+        <version>${jersey.version}</version>
       </dependency>
 
       <dependency>
         <groupId>com.sun.jersey.jersey-test-framework</groupId>
         <artifactId>jersey-test-framework-core</artifactId>
-        <version>1.8</version>
+        <version>${jersey.version}</version>
         <scope>test</scope>
       </dependency>
       <dependency>
         <groupId>com.sun.jersey.jersey-test-framework</groupId>
         <artifactId>jersey-test-framework-grizzly2</artifactId>
-        <version>1.8</version>
+        <version>${jersey.version}</version>
       </dependency>
 
       <dependency>

+ 9 - 0
hadoop-yarn-project/CHANGES.txt

@@ -194,6 +194,15 @@ Release 2.1.1-beta - UNRELEASED
     YARN-1194. TestContainerLogsPage fails with native builds (Roman Shaposhnik
     via jlowe)
 
+    YARN-1116. Populate AMRMTokens back to AMRMTokenSecretManager after RM
+    restarts (Jian He via bikas)
+
+    YARN-1189. NMTokenSecretManagerInNM is not being told when applications
+    have finished (Omkar Vinit Joshi via jlowe)
+
+    YARN-540. Race condition causing RM to potentially relaunch already
+    unregistered AMs on RM restart (Jian He via bikas)
+
 Release 2.1.0-beta - 2013-08-22
 
   INCOMPATIBLE CHANGES

+ 35 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/FinishApplicationMasterResponse.java

@@ -26,21 +26,52 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.util.Records;
 
 /**
- * <p>The response sent by the <code>ResourceManager</code> to a 
- * <code>ApplicationMaster</code> on it's completion.</p>
+ * <p>
+ * The response sent by the <code>ResourceManager</code> to a
+ * <code>ApplicationMaster</code> on it's completion.
+ * </p>
  * 
- * <p>Currently, this is empty.</p>
+ * <p>
+ * The response, includes:
+ * <ul>
+ * <li>A flag which indicates that the application has successfully unregistered
+ * with the RM and the application can safely stop.</li>
+ * </ul>
+ * </p>
+ * Note: The flag indicates whether the application has successfully
+ * unregistered and is safe to stop. The application may stop after the flag is
+ * true. If the application stops before the flag is true then the RM may retry
+ * the application .
  * 
  * @see ApplicationMasterProtocol#finishApplicationMaster(FinishApplicationMasterRequest)
  */
 @Public
 @Stable
 public abstract class FinishApplicationMasterResponse {
+
   @Private
   @Unstable
-  public static FinishApplicationMasterResponse newInstance() {
+  public static FinishApplicationMasterResponse newInstance(
+      boolean isRemovedFromRMStateStore) {
     FinishApplicationMasterResponse response =
         Records.newRecord(FinishApplicationMasterResponse.class);
+    response.setIsUnregistered(isRemovedFromRMStateStore);
     return response;
   }
+
+  /**
+   * Get the flag which indicates that the application has successfully
+   * unregistered with the RM and the application can safely stop.
+   */
+  @Public
+  @Stable
+  public abstract boolean getIsUnregistered();
+
+  /**
+   * Set the flag which indicates that the application has successfully
+   * unregistered with the RM and the application can safely stop.
+   */
+  @Private
+  @Unstable
+  public abstract void setIsUnregistered(boolean isUnregistered);
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto

@@ -52,6 +52,7 @@ message FinishApplicationMasterRequestProto {
 }
 
 message FinishApplicationMasterResponseProto {
+  optional bool isUnregistered = 1 [default = false];
 }
 
 message AllocateRequestProto {

+ 16 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java

@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -300,11 +301,24 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
       String appMessage, String appTrackingUrl) throws YarnException,
       IOException {
     Preconditions.checkArgument(appStatus != null,
-        "AppStatus should not be null.");
+      "AppStatus should not be null.");
     FinishApplicationMasterRequest request =
         FinishApplicationMasterRequest.newInstance(appStatus, appMessage,
           appTrackingUrl);
-    rmClient.finishApplicationMaster(request);
+    try {
+      while (true) {
+        FinishApplicationMasterResponse response =
+            rmClient.finishApplicationMaster(request);
+        if (response.getIsUnregistered()) {
+          break;
+        }
+        LOG.info("Waiting for application to be successfully unregistered.");
+        Thread.sleep(100);
+      }
+    } catch (InterruptedException e) {
+      LOG.info("Interrupted while waiting for application"
+          + " to be removed from RMStateStore");
+    }
   }
   
   @Override

+ 22 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/FinishApplicationMasterResponsePBImpl.java

@@ -22,7 +22,9 @@ package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.FinishApplicationMasterResponseProtoOrBuilder;
 
 import com.google.protobuf.TextFormat;
 
@@ -67,4 +69,24 @@ public class FinishApplicationMasterResponsePBImpl extends FinishApplicationMast
   public String toString() {
     return TextFormat.shortDebugString(getProto());
   }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = FinishApplicationMasterResponseProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public boolean getIsUnregistered() {
+    FinishApplicationMasterResponseProtoOrBuilder p =
+        viaProto ? proto : builder;
+    return p.getIsUnregistered();
+  }
+
+  @Override
+  public void setIsUnregistered(boolean isUnregistered) {
+    maybeInitBuilder();
+    builder.setIsUnregistered(isUnregistered);
+  }
 }  

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java

@@ -395,6 +395,7 @@ public class ApplicationImpl implements Application {
       app.dispatcher.getEventHandler().handle(
           new LogHandlerAppFinishedEvent(app.appId));
 
+      app.context.getNMTokenSecretManager().appFinished(app.getAppId());
     }
   }
 

+ 27 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.refEq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
@@ -62,6 +63,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.Test;
@@ -413,6 +415,27 @@ public class TestApplication {
     }
   }
 
+  @Test
+  public void testNMTokenSecretManagerCleanup() {
+    WrappedApplication wa = null;
+    try {
+      wa = new WrappedApplication(1, 314159265358979L, "yak", 1);
+      wa.initApplication();
+      wa.initContainer(0);
+      assertEquals(ApplicationState.INITING, wa.app.getApplicationState());
+      assertEquals(1, wa.app.getContainers().size());
+      wa.appFinished();
+      wa.containerFinished(0);
+      wa.appResourcesCleanedup();
+      assertEquals(ApplicationState.FINISHED, wa.app.getApplicationState());
+      verify(wa.nmTokenSecretMgr).appFinished(eq(wa.appId));
+    } finally {
+      if (wa != null) {
+        wa.finished();
+      }
+    }
+  }
+
   private class ContainerKillMatcher extends ArgumentMatcher<ContainerEvent> {
     private ContainerId cId;
 
@@ -460,6 +483,7 @@ public class TestApplication {
     final List<Container> containers;
     final Context context;
     final Map<ContainerId, ContainerTokenIdentifier> containerTokenIdentifierMap;
+    final NMTokenSecretManagerInNM nmTokenSecretMgr;
     
     final ApplicationId appId;
     final Application app;
@@ -486,12 +510,15 @@ public class TestApplication {
       dispatcher.register(ContainerEventType.class, containerBus);
       dispatcher.register(LogHandlerEventType.class, logAggregationBus);
 
+      nmTokenSecretMgr = mock(NMTokenSecretManagerInNM.class);
+
       context = mock(Context.class);
       
       when(context.getContainerTokenSecretManager()).thenReturn(
         new NMContainerTokenSecretManager(conf));
       when(context.getApplicationACLsManager()).thenReturn(
         new ApplicationACLsManager(conf));
+      when(context.getNMTokenSecretManager()).thenReturn(nmTokenSecretMgr);
       
       // Setting master key
       MasterKey masterKey = new MasterKeyPBImpl();

+ 7 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

@@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
@@ -303,9 +304,12 @@ public class ApplicationMasterService extends AbstractService implements
               .getTrackingUrl(), request.getFinalApplicationStatus(), request
               .getDiagnostics()));
 
-      FinishApplicationMasterResponse response = recordFactory
-          .newRecordInstance(FinishApplicationMasterResponse.class);
-      return response;
+      if (rmContext.getRMApps().get(applicationAttemptId.getApplicationId())
+          .isAppSafeToUnregister()) {
+        return FinishApplicationMasterResponse.newInstance(true);
+      } else {
+        return FinishApplicationMasterResponse.newInstance(false);
+      }
     }
   }
 

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java

@@ -414,8 +414,8 @@ public class ClientRMService extends AbstractService implements
       }
 
       if (applicationStates != null && !applicationStates.isEmpty()) {
-        if (!applicationStates.contains(RMServerUtils
-            .createApplicationState(application.getState()))) {
+        if (!applicationStates.contains(application
+            .createApplicationState())) {
           continue;
         }
       }

+ 0 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java

@@ -186,10 +186,6 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
       
       completedApps.add(applicationId);  
       writeAuditLog(applicationId);
-      
-      // application completely done. Remove from state
-      RMStateStore store = rmContext.getStateStore();
-      store.removeApplication(rmContext.getRMApps().get(applicationId));
     }
   }
 

+ 0 - 23
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java

@@ -115,27 +115,4 @@ public class RMServerUtils {
       }
     }
   }
-
-  public static YarnApplicationState createApplicationState(RMAppState rmAppState) {
-    switch(rmAppState) {
-    case NEW:
-      return YarnApplicationState.NEW;
-    case NEW_SAVING:
-      return YarnApplicationState.NEW_SAVING;
-    case SUBMITTED:
-      return YarnApplicationState.SUBMITTED;
-    case ACCEPTED:
-      return YarnApplicationState.ACCEPTED;
-    case RUNNING:
-      return YarnApplicationState.RUNNING;
-    case FINISHING:
-    case FINISHED:
-      return YarnApplicationState.FINISHED;
-    case KILLED:
-      return YarnApplicationState.KILLED;
-    case FAILED:
-      return YarnApplicationState.FAILED;
-    }
-    throw new YarnRuntimeException("Unknown state passed!");
-  }
 }

+ 7 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@@ -108,7 +109,9 @@ public class MemoryRMStateStore extends RMStateStore {
 
     ApplicationState appState = state.getApplicationState().get(
         attemptState.getAttemptId().getApplicationId());
-    assert appState != null;
+    if (appState == null) {
+      throw new YarnRuntimeException("Application doesn't exist");
+    }
 
     if (appState.attempts.containsKey(attemptState.getAttemptId())) {
       Exception e = new IOException("Attempt: " +
@@ -125,7 +128,9 @@ public class MemoryRMStateStore extends RMStateStore {
                                                             throws Exception {
     ApplicationId appId = appState.getAppId();
     ApplicationState removed = state.appState.remove(appId);
-    assert removed != null;
+    if (removed == null) {
+      throw new YarnRuntimeException("Removing non-exsisting application state");
+    }
   }
 
   @Override

+ 17 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java

@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.Ap
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRemovedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStoredEvent;
 
@@ -482,12 +483,15 @@ public abstract class RMStateStore extends AbstractService {
           ApplicationState appState = 
                           ((RMStateStoreRemoveAppEvent) event).getAppState();
           ApplicationId appId = appState.getAppId();
-          
+          Exception removedException = null;
           LOG.info("Removing info for app: " + appId);
           try {
             removeApplicationState(appState);
           } catch (Exception e) {
             LOG.error("Error removing app: " + appId, e);
+            removedException = e;
+          } finally {
+            notifyDoneRemovingApplcation(appId, removedException);
           }
         }
         break;
@@ -521,7 +525,18 @@ public abstract class RMStateStore extends AbstractService {
     rmDispatcher.getEventHandler().handle(
         new RMAppAttemptStoredEvent(attemptId, storedException));
   }
-  
+
+  @SuppressWarnings("unchecked")
+  /**
+   * This is to notify RMApp that this application has been removed from
+   * RMStateStore
+   */
+  private void notifyDoneRemovingApplcation(ApplicationId appId,
+      Exception removedException) {
+    rmDispatcher.getEventHandler().handle(
+      new RMAppRemovedEvent(appId, removedException));
+  }
+
   /**
    * EventHandler implementation which forward events to the FSRMStateStore
    * This hides the EventHandle methods of the store from its public interface 

+ 17 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -194,4 +195,20 @@ public interface RMApp extends EventHandler<RMAppEvent> {
    * @return the application type.
    */
   String getApplicationType(); 
+
+  /**
+   * Check whether this application is safe to unregister.
+   * An application is deemed to be safe to unregister if it is an unmanaged
+   * AM or its state has been removed from state store.
+   * @return the flag which indicates whether this application is safe to
+   *         unregister.
+   */
+  boolean isAppSafeToUnregister();
+
+  /**
+   * Create the external user-facing state of ApplicationMaster from the
+   * current state of the {@link RMApp}.
+   * @return the external user-facing state of ApplicationMaster.
+   */
+  YarnApplicationState createApplicationState();
 }

+ 6 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java

@@ -27,11 +27,14 @@ public enum RMAppEventType {
   // Source: RMAppAttempt
   APP_REJECTED,
   APP_ACCEPTED,
-  APP_SAVED,
   ATTEMPT_REGISTERED,
-  ATTEMPT_FINISHING,
+  ATTEMPT_UNREGISTERED,
   ATTEMPT_FINISHED, // Will send the final state
   ATTEMPT_FAILED,
   ATTEMPT_KILLED,
-  NODE_UPDATE
+  NODE_UPDATE,
+
+  // Source: RMStateStore
+  APP_SAVED,
+  APP_REMOVED
 }

+ 92 - 17
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNodeUpdateEvent.RMAppNodeUpdateType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -109,6 +111,8 @@ public class RMAppImpl implements RMApp, Recoverable {
   private static final FinalTransition FINAL_TRANSITION = new FinalTransition();
   private static final AppFinishedTransition FINISHED_TRANSITION =
       new AppFinishedTransition();
+  private boolean isAppRemovalRequestSent = false;
+  private RMAppState previousStateAtRemoving;
 
   private static final StateMachineFactory<RMAppImpl,
                                            RMAppState,
@@ -167,8 +171,9 @@ public class RMAppImpl implements RMApp, Recoverable {
      // Transitions from RUNNING state
     .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
-    .addTransition(RMAppState.RUNNING, RMAppState.FINISHING,
-        RMAppEventType.ATTEMPT_FINISHING, new RMAppFinishingTransition())
+    .addTransition(RMAppState.RUNNING, RMAppState.REMOVING,
+          RMAppEventType.ATTEMPT_UNREGISTERED,
+        new RMAppRemovingTransition())
     .addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
         RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
     .addTransition(RMAppState.RUNNING,
@@ -178,6 +183,17 @@ public class RMAppImpl implements RMApp, Recoverable {
     .addTransition(RMAppState.RUNNING, RMAppState.KILLED,
         RMAppEventType.KILL, new KillAppAndAttemptTransition())
 
+     // Transitions from REMOVING state
+    .addTransition(RMAppState.REMOVING, RMAppState.FINISHING,
+        RMAppEventType.APP_REMOVED,  new RMAppFinishingTransition())
+    .addTransition(RMAppState.REMOVING, RMAppState.FINISHED,
+        RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
+    .addTransition(RMAppState.REMOVING, RMAppState.KILLED,
+        RMAppEventType.KILL, new KillAppAndAttemptTransition())
+    // ignorable transitions
+    .addTransition(RMAppState.REMOVING, RMAppState.REMOVING,
+        RMAppEventType.NODE_UPDATE)
+
      // Transitions from FINISHING state
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
         RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
@@ -185,36 +201,34 @@ public class RMAppImpl implements RMApp, Recoverable {
         RMAppEventType.KILL, new KillAppAndAttemptTransition())
     // ignorable transitions
     .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
-        RMAppEventType.NODE_UPDATE)
+      EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.APP_REMOVED))
 
      // Transitions from FINISHED state
-    .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
-        RMAppEventType.KILL)
      // ignorable transitions
     .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
         EnumSet.of(
             RMAppEventType.NODE_UPDATE,
-            RMAppEventType.ATTEMPT_FINISHING,
-            RMAppEventType.ATTEMPT_FINISHED))
+            RMAppEventType.ATTEMPT_UNREGISTERED,
+            RMAppEventType.ATTEMPT_FINISHED,
+            RMAppEventType.KILL,
+            RMAppEventType.APP_REMOVED))
 
      // Transitions from FAILED state
-    .addTransition(RMAppState.FAILED, RMAppState.FAILED,
-        EnumSet.of(RMAppEventType.KILL, RMAppEventType.APP_SAVED))
      // ignorable transitions
-    .addTransition(RMAppState.FAILED, RMAppState.FAILED, 
-        RMAppEventType.NODE_UPDATE)
+    .addTransition(RMAppState.FAILED, RMAppState.FAILED,
+        EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE,
+          RMAppEventType.APP_SAVED, RMAppEventType.APP_REMOVED))
 
      // Transitions from KILLED state
+     // ignorable transitions
     .addTransition(
         RMAppState.KILLED,
         RMAppState.KILLED,
         EnumSet.of(RMAppEventType.APP_ACCEPTED,
             RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
             RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
-            RMAppEventType.ATTEMPT_KILLED, RMAppEventType.APP_SAVED))
-     // ignorable transitions
-    .addTransition(RMAppState.KILLED, RMAppState.KILLED,
-        RMAppEventType.NODE_UPDATE)
+            RMAppEventType.ATTEMPT_KILLED, RMAppEventType.NODE_UPDATE,
+            RMAppEventType.APP_SAVED, RMAppEventType.APP_REMOVED))
 
      .installTopology();
 
@@ -384,6 +398,7 @@ public class RMAppImpl implements RMApp, Recoverable {
     case SUBMITTED:
     case ACCEPTED:
     case RUNNING:
+    case REMOVING:
       return FinalApplicationStatus.UNDEFINED;    
     // finished without a proper final state is the same as failed  
     case FINISHING:
@@ -475,7 +490,7 @@ public class RMAppImpl implements RMApp, Recoverable {
       return BuilderUtils.newApplicationReport(this.applicationId,
           currentApplicationAttemptId, this.user, this.queue,
           this.name, host, rpcPort, clientToAMToken,
-          RMServerUtils.createApplicationState(this.stateMachine.getCurrentState()), diags,
+          createApplicationState(), diags,
           trackingUrl, this.startTime, this.finishTime, finishState,
           appUsageReport, origTrackingUrl, progress, this.applicationType, 
           amrmToken);
@@ -569,7 +584,7 @@ public class RMAppImpl implements RMApp, Recoverable {
   }
   
   @Override
-  public void recover(RMState state) {
+  public void recover(RMState state) throws Exception{
     ApplicationState appState = state.getApplicationState().get(getApplicationId());
     LOG.info("Recovering app: " + getApplicationId() + " with " + 
             + appState.getAttemptCount() + " attempts");
@@ -657,6 +672,15 @@ public class RMAppImpl implements RMApp, Recoverable {
     }
   }
 
+  private static final class RMAppRemovingTransition extends RMAppTransition {
+    @Override
+    public void transition(RMAppImpl app, RMAppEvent event) {
+      LOG.info("Removing application with id " + app.applicationId);
+      app.removeApplicationState();
+      app.previousStateAtRemoving = app.getState();
+    }
+  }
+
   private static class AppFinishedTransition extends FinalTransition {
     public void transition(RMAppImpl app, RMAppEvent event) {
       RMAppFinishedAttemptEvent finishedEvent =
@@ -712,6 +736,9 @@ public class RMAppImpl implements RMApp, Recoverable {
       if (app.getState() != RMAppState.FINISHING) {
         app.finishTime = System.currentTimeMillis();
       }
+      // application completely done and remove from state store.
+      app.removeApplicationState();
+
       app.handler.handle(
           new RMAppManagerEvent(app.applicationId,
           RMAppManagerEventType.APP_COMPLETED));
@@ -764,4 +791,52 @@ public class RMAppImpl implements RMApp, Recoverable {
   public String getApplicationType() {
     return this.applicationType;
   }
+
+  @Override
+  public boolean isAppSafeToUnregister() {
+    RMAppState state = getState();
+    return state.equals(RMAppState.FINISHING)
+        || state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED)
+        || state.equals(RMAppState.KILLED) ||
+        // If this is an unmanaged AM, we are safe to unregister since unmanaged
+        // AM will immediately go to FINISHED state on AM unregistration
+        getApplicationSubmissionContext().getUnmanagedAM();
+  }
+
+  @Override
+  public YarnApplicationState createApplicationState() {
+    RMAppState rmAppState = getState();
+    // If App is in REMOVING state, return its previous state.
+    if (rmAppState.equals(RMAppState.REMOVING)) {
+      rmAppState = previousStateAtRemoving;
+    }
+    switch (rmAppState) {
+    case NEW:
+      return YarnApplicationState.NEW;
+    case NEW_SAVING:
+      return YarnApplicationState.NEW_SAVING;
+    case SUBMITTED:
+      return YarnApplicationState.SUBMITTED;
+    case ACCEPTED:
+      return YarnApplicationState.ACCEPTED;
+    case RUNNING:
+      return YarnApplicationState.RUNNING;
+    case FINISHING:
+    case FINISHED:
+      return YarnApplicationState.FINISHED;
+    case KILLED:
+      return YarnApplicationState.KILLED;
+    case FAILED:
+      return YarnApplicationState.FAILED;
+    default:
+      throw new YarnRuntimeException("Unknown state passed!");
+    }
+  }
+
+  private void removeApplicationState(){
+    if (!isAppRemovalRequestSent) {
+      rmContext.getStateStore().removeApplication(this);
+      isAppRemovalRequestSent = true;
+    }
+  }
 }

+ 36 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRemovedEvent.java

@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+public class RMAppRemovedEvent extends RMAppEvent {
+
+  private final Exception removedException;
+
+  public RMAppRemovedEvent(ApplicationId appId, Exception removedException) {
+    super(appId, RMAppEventType.APP_REMOVED);
+    this.removedException = removedException;
+  }
+
+  public Exception getRemovedException() {
+    return removedException;
+  }
+
+}

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java

@@ -24,6 +24,7 @@ public enum RMAppState {
   SUBMITTED,
   ACCEPTED,
   RUNNING,
+  REMOVING,
   FINISHING,
   FINISHED,
   FAILED,

+ 6 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
 
 import static org.apache.hadoop.yarn.util.StringHelper.pjoin;
 
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
@@ -675,7 +676,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
   }
 
   @Override
-  public void recover(RMState state) {
+  public void recover(RMState state) throws Exception{
     ApplicationState appState = 
         state.getApplicationState().get(getAppAttemptId().getApplicationId());
     ApplicationAttemptState attemptState = appState.getAttempt(getAppAttemptId());
@@ -690,7 +691,8 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
                                  RMAppAttemptEventType.RECOVER));
   }
 
-  private void recoverAppAttemptCredentials(Credentials appAttemptTokens) {
+  private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
+      throws IOException {
     if (appAttemptTokens == null) {
       return;
     }
@@ -707,11 +709,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
     this.amrmToken =
         (Token<AMRMTokenIdentifier>) appAttemptTokens
           .getToken(RMStateStore.AM_RM_TOKEN_SERVICE);
-
-    // For now, no need to populate tokens back to AMRMTokenSecretManager,
-    // because running attempts are rebooted. Later in work-preserve restart,
-    // we'll create NEW->RUNNING transition in which the restored tokens will be
-    // added to the secret manager
+    rmContext.getAMRMTokenSecretManager().addPersistedPassword(this.amrmToken);
   }
 
   private static class BaseTransition implements
@@ -1149,7 +1147,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
       ApplicationId applicationId =
           appAttempt.getAppAttemptId().getApplicationId();
       appAttempt.eventHandler.handle(
-          new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_FINISHING));
+          new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_UNREGISTERED));
       return RMAppAttemptState.FINISHING;
     }
   }

+ 15 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.security;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Timer;
@@ -30,6 +31,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@@ -123,6 +125,19 @@ public class AMRMTokenSecretManager extends
     return password;
   }
 
+  /**
+   * Populate persisted password of AMRMToken back to AMRMTokenSecretManager.
+   */
+  public synchronized void
+      addPersistedPassword(Token<AMRMTokenIdentifier> token) throws IOException {
+    AMRMTokenIdentifier identifier = token.decodeIdentifier();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding password for " + identifier.getApplicationAttemptId());
+    }
+    this.passwords.put(identifier.getApplicationAttemptId(),
+      token.getPassword());
+  }
+
   /**
    * Retrieve the password for the given {@link AMRMTokenIdentifier}.
    * Used by RPC layer to validate a remote {@link AMRMTokenIdentifier}.

+ 9 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java

@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
@@ -333,10 +334,12 @@ public class TestRMRestart {
 
     // finish the AM's
     am1.unregisterAppAttempt();
+    rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.FINISHING);
     am1Node.nodeHeartbeat(attempt1.getAppAttemptId(), 1, ContainerState.COMPLETE);
     am1.waitForState(RMAppAttemptState.FINISHED);
     
     am2.unregisterAppAttempt();
+    rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.FINISHING);
     am2Node.nodeHeartbeat(attempt2.getAppAttemptId(), 1, ContainerState.COMPLETE);
     am2.waitForState(RMAppAttemptState.FINISHED);
     
@@ -577,14 +580,16 @@ public class TestRMRestart {
         attempt1.getClientTokenMasterKey(),
         loadedAttempt1.getClientTokenMasterKey());
 
-    // assert secret manager also knows about the key
+    // assert ClientTokenSecretManager also knows about the key
     Assert.assertArrayEquals(clientTokenMasterKey,
         rm2.getClientToAMTokenSecretManager().getMasterKey(attemptId1)
             .getEncoded());
 
-    // Not testing ApplicationTokenSecretManager has the password populated back,
-    // that is needed in work-preserving restart
-
+    // assert AMRMTokenSecretManager also knows about the AMRMToken password
+    Token<AMRMTokenIdentifier> amrmToken = loadedAttempt1.getAMRMToken();
+    Assert.assertArrayEquals(amrmToken.getPassword(),
+      rm2.getAMRMTokenSecretManager().retrievePassword(
+        amrmToken.decodeIdentifier()));
     rm1.stop();
     rm2.stop();
   }

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -142,6 +143,16 @@ public abstract class MockAsm extends MockApps {
     public void setQueue(String name) {
       throw new UnsupportedOperationException("Not supported yet.");
     }
+
+    @Override
+    public boolean isAppSafeToUnregister() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public YarnApplicationState createApplicationState() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
   }
 
   public static RMApp newApplication(int i) {

+ 11 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.MockApps;
@@ -215,6 +216,15 @@ public class MockRMApp implements RMApp {
   @Override
   public String getApplicationType() {
     return YarnConfiguration.DEFAULT_APPLICATION_TYPE;
-  };
+  }
 
+  @Override
+  public boolean isAppSafeToUnregister() {
+    return true;
+  }
+
+  @Override
+  public YarnApplicationState createApplicationState() {
+    return null;
+  };
 }

+ 48 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
 import static org.mockito.Mockito.mock;
 import static org.junit.Assume.assumeTrue;
+import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -59,8 +60,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -78,6 +80,7 @@ public class TestRMAppTransitions {
       YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS;
   private static int appId = 1;
   private DrainDispatcher rmDispatcher;
+  private RMStateStore store;
 
   // ignore all the RM application attempt events
   private static final class TestApplicationAttemptEventDispatcher implements
@@ -171,7 +174,7 @@ public class TestRMAppTransitions {
         mock(ContainerAllocationExpirer.class);
     AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
     AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
-    RMStateStore store = mock(RMStateStore.class);
+    store = mock(RMStateStore.class);
     this.rmContext =
         new RMContextImpl(rmDispatcher, store,
           containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
@@ -278,6 +281,10 @@ public class TestRMAppTransitions {
         (application.getFinishTime() >= application.getStartTime()));
   }
 
+  private void assertAppRemoved(RMApp application){
+    verify(store).removeApplication(application);
+  }
+
   private static void assertKilled(RMApp application) {
     assertTimesAtFinish(application);
     assertAppState(RMAppState.KILLED, application);
@@ -366,15 +373,27 @@ public class TestRMAppTransitions {
     return application;
   }
 
+  protected RMApp testCreateAppRemoving(
+      ApplicationSubmissionContext submissionContext) throws IOException {
+    RMApp application = testCreateAppRunning(submissionContext);
+    RMAppEvent finishingEvent =
+        new RMAppEvent(application.getApplicationId(),
+          RMAppEventType.ATTEMPT_UNREGISTERED);
+    application.handle(finishingEvent);
+    assertAppState(RMAppState.REMOVING, application);
+    assertAppRemoved(application);
+    return application;
+  }
+
   protected RMApp testCreateAppFinishing(
       ApplicationSubmissionContext submissionContext) throws IOException {
     // unmanaged AMs don't use the FINISHING state
     assert submissionContext == null || !submissionContext.getUnmanagedAM();
-    RMApp application = testCreateAppRunning(submissionContext);
-    // RUNNING => FINISHING event RMAppEventType.ATTEMPT_FINISHING
+    RMApp application = testCreateAppRemoving(submissionContext);
+    // REMOVING => FINISHING event RMAppEventType.APP_REMOVED
     RMAppEvent finishingEvent =
         new RMAppEvent(application.getApplicationId(),
-            RMAppEventType.ATTEMPT_FINISHING);
+            RMAppEventType.APP_REMOVED);
     application.handle(finishingEvent);
     assertAppState(RMAppState.FINISHING, application);
     assertTimesAtFinish(application);
@@ -634,6 +653,30 @@ public class TestRMAppTransitions {
     assertFailed(application, ".*Failing the application.*");
   }
 
+  @Test
+  public void testAppRemovingFinished() throws IOException {
+    LOG.info("--- START: testAppRemovingFINISHED ---");
+    RMApp application = testCreateAppRemoving(null);
+    // APP_REMOVING => FINISHED event RMAppEventType.ATTEMPT_FINISHED
+    RMAppEvent finishedEvent = new RMAppFinishedAttemptEvent(
+      application.getApplicationId(), null);
+    application.handle(finishedEvent);
+    rmDispatcher.await();
+    assertAppState(RMAppState.FINISHED, application);
+  }
+
+  @Test
+  public void testAppRemovingKilled() throws IOException {
+    LOG.info("--- START: testAppRemovingKilledD ---");
+    RMApp application = testCreateAppRemoving(null);
+    // APP_REMOVING => KILLED event RMAppEventType.KILL
+    RMAppEvent event =
+        new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
+    application.handle(event);
+    rmDispatcher.await();
+    assertAppState(RMAppState.KILLED, application);
+  }
+
   @Test
   public void testAppFinishingKill() throws IOException {
     LOG.info("--- START: testAppFinishedFinished ---");