浏览代码

Merge r1476010 through r1476452 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1476453 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 12 年之前
父节点
当前提交
60341dae19
共有 76 个文件被更改,包括 1721 次插入212 次删除
  1. 22 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
  2. 10 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  3. 34 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  4. 21 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
  5. 21 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  6. 4 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
  7. 4 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
  8. 5 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
  9. 53 12
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
  10. 25 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  11. 19 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
  12. 69 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
  13. 103 11
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
  14. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  15. 4 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  16. 6 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
  17. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
  18. 4 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  19. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
  20. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java
  21. 86 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java
  22. 117 6
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java
  23. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
  24. 4 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
  25. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java
  26. 220 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
  27. 11 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java
  28. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  29. 4 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
  30. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
  31. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java
  32. 9 0
      hadoop-yarn-project/CHANGES.txt
  33. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
  34. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java
  35. 1 13
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
  36. 14 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto
  37. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
  38. 4 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
  39. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
  40. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
  41. 24 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerConstants.java
  42. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java
  43. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java
  44. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
  45. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
  46. 17 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
  47. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
  48. 16 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
  49. 27 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
  50. 33 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/InvalidContainerException.java
  51. 34 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/NMNotYetReadyException.java
  52. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
  53. 11 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
  54. 315 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
  55. 10 78
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
  56. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
  57. 7 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
  58. 88 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
  59. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
  60. 4 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
  61. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
  62. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
  63. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
  64. 3 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java
  65. 5 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateDataPBImpl.java
  66. 3 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java
  67. 4 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateDataPBImpl.java
  68. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  69. 3 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
  70. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
  71. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
  72. 16 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
  73. 84 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationmasterservice/TestApplicationMasterService.java
  74. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
  75. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java
  76. 52 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestWebAppProxyServer.java

+ 22 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java

@@ -446,6 +446,28 @@ public class NetworkTopology {
     return getNode(node.getNetworkLocation());
   }
   
+  /**
+   * Given a string representation of a rack, return its children
+   * @param loc a path-like string representation of a rack
+   * @return a newly allocated list with all the node's children
+   */
+  public List<Node> getDatanodesInRack(String loc) {
+    netlock.readLock().lock();
+    try {
+      loc = NodeBase.normalize(loc);
+      if (!NodeBase.ROOT.equals(loc)) {
+        loc = loc.substring(1);
+      }
+      InnerNode rack = (InnerNode) clusterMap.getLoc(loc);
+      if (rack == null) {
+        return null;
+      }
+      return new ArrayList<Node>(rack.getChildren());
+    } finally {
+      netlock.readLock().unlock();
+    }
+  }
+
   /** Remove a node
    * Update node counter and rack counter if necessary
    * @param node node to be removed; can be null

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -22,6 +22,9 @@ Trunk (Unreleased)
     Azure environments. (See breakdown of tasks below for subtasks and
     contributors)
 
+    HDFS-2576. Enhances the DistributedFileSystem's create API so that clients
+    can specify favored datanodes for a file's blocks. (ddas)
+
   IMPROVEMENTS
 
     HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common.
@@ -262,6 +265,9 @@ Trunk (Unreleased)
     HDFS-4757. Update FSDirectory#inodeMap when replacing an INodeDirectory
     while setting quota.  (Jing Zhao via szetszwo)
 
+    HDFS-4761. When resetting FSDirectory, the inodeMap should also be reset.
+    (Jing Zhao via szetszwo)
+
   BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS
 
     HDFS-4145. Merge hdfs cmd line scripts from branch-1-win. (David Lao,
@@ -457,6 +463,10 @@ Release 2.0.5-beta - UNRELEASED
     HDFS-4346. Add SequentialNumber as a base class for INodeId and
     GenerationStamp.  (szetszwo)
 
+    HDFS-4721. Speed up lease recovery by avoiding stale datanodes and choosing
+    the datanode with the most recent heartbeat as the primary.  (Varun Sharma
+    via szetszwo)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 34 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -1210,7 +1210,7 @@ public class DFSClient implements java.io.Closeable {
                              ChecksumOpt checksumOpt)
       throws IOException {
     return create(src, permission, flag, true,
-        replication, blockSize, progress, buffersize, checksumOpt);
+        replication, blockSize, progress, buffersize, checksumOpt, null);
   }
 
   /**
@@ -1244,6 +1244,29 @@ public class DFSClient implements java.io.Closeable {
                              Progressable progress,
                              int buffersize,
                              ChecksumOpt checksumOpt) throws IOException {
+    return create(src, permission, flag, createParent, replication, blockSize, 
+        progress, buffersize, checksumOpt, null);
+  }
+
+  /**
+   * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long,
+   * Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is
+   * a hint to where the namenode should place the file blocks.
+   * The favored nodes hint is not persisted in HDFS. Hence it may be honored
+   * at the creation time only. HDFS could move the blocks during balancing or
+   * replication, to move the blocks from favored nodes. A value of null means
+   * no favored nodes for this create
+   */
+  public DFSOutputStream create(String src, 
+                             FsPermission permission,
+                             EnumSet<CreateFlag> flag, 
+                             boolean createParent,
+                             short replication,
+                             long blockSize,
+                             Progressable progress,
+                             int buffersize,
+                             ChecksumOpt checksumOpt,
+                             InetSocketAddress[] favoredNodes) throws IOException {
     checkOpen();
     if (permission == null) {
       permission = FsPermission.getFileDefault();
@@ -1252,9 +1275,18 @@ public class DFSClient implements java.io.Closeable {
     if(LOG.isDebugEnabled()) {
       LOG.debug(src + ": masked=" + masked);
     }
+    String[] favoredNodeStrs = null;
+    if (favoredNodes != null) {
+      favoredNodeStrs = new String[favoredNodes.length];
+      for (int i = 0; i < favoredNodes.length; i++) {
+        favoredNodeStrs[i] = 
+            favoredNodes[i].getAddress().getHostAddress() + ":" 
+                         + favoredNodes[i].getPort();
+      }
+    }
     final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
         src, masked, flag, createParent, replication, blockSize, progress,
-        buffersize, dfsClientConf.createChecksum(checksumOpt));
+        buffersize, dfsClientConf.createChecksum(checksumOpt), favoredNodeStrs);
     beginFileLease(src, result);
     return result;
   }

+ 21 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -315,6 +315,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
             return key;
           }
         });
+    private String[] favoredNodes;
     volatile boolean hasError = false;
     volatile int errorIndex = -1;
     private BlockConstructionStage stage;  // block construction stage
@@ -391,7 +392,11 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
 
       }
     }
-    
+
+    private void setFavoredNodes(String[] favoredNodes) {
+      this.favoredNodes = favoredNodes;
+    }
+
     /**
      * Initialize for data streaming
      */
@@ -1177,7 +1182,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
         while (true) {
           try {
             return dfsClient.namenode.addBlock(src, dfsClient.clientName,
-                block, excludedNodes, fileId);
+                block, excludedNodes, fileId, favoredNodes);
           } catch (RemoteException e) {
             IOException ue = 
               e.unwrapRemoteException(FileNotFoundException.class,
@@ -1318,7 +1323,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
   /** Construct a new output stream for creating a file. */
   private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
       EnumSet<CreateFlag> flag, Progressable progress,
-      DataChecksum checksum) throws IOException {
+      DataChecksum checksum, String[] favoredNodes) throws IOException {
     this(dfsClient, src, progress, stat, checksum);
     this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
 
@@ -1326,12 +1331,15 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
         checksum.getBytesPerChecksum());
 
     streamer = new DataStreamer();
+    if (favoredNodes != null && favoredNodes.length != 0) {
+      streamer.setFavoredNodes(favoredNodes);
+    }
   }
 
   static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
       FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
       short replication, long blockSize, Progressable progress, int buffersize,
-      DataChecksum checksum) throws IOException {
+      DataChecksum checksum, String[] favoredNodes) throws IOException {
     final HdfsFileStatus stat;
     try {
       stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
@@ -1349,11 +1357,19 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
                                      SnapshotAccessControlException.class);
     }
     final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
-        flag, progress, checksum);
+        flag, progress, checksum, favoredNodes);
     out.start();
     return out;
   }
 
+  static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
+      FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
+      short replication, long blockSize, Progressable progress, int buffersize,
+      DataChecksum checksum) throws IOException {
+    return newStreamForCreate(dfsClient, src, masked, flag, createParent, replication,
+        blockSize, progress, buffersize, checksum, null);
+  }
+
   /** Construct a new output stream for append. */
   private DFSOutputStream(DFSClient dfsClient, String src,
       Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat,

+ 21 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -268,6 +268,27 @@ public class DistributedFileSystem extends FileSystem {
             : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
         blockSize, progress, null);
   }
+
+  /**
+   * Same as  
+   * {@link #create(Path, FsPermission, boolean, int, short, long, 
+   * Progressable)} with the addition of favoredNodes that is a hint to 
+   * where the namenode should place the file blocks.
+   * The favored nodes hint is not persisted in HDFS. Hence it may be honored
+   * at the creation time only. HDFS could move the blocks during balancing or
+   * replication, to move the blocks from favored nodes. A value of null means
+   * no favored nodes for this create
+   */
+  public HdfsDataOutputStream create(Path f, FsPermission permission,
+      boolean overwrite, int bufferSize, short replication, long blockSize,
+      Progressable progress, InetSocketAddress[] favoredNodes) throws IOException {
+    statistics.incrementWriteOps(1);
+    final DFSOutputStream out = dfs.create(getPathName(f), permission,
+        overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+            : EnumSet.of(CreateFlag.CREATE),
+        true, replication, blockSize, progress, bufferSize, null, favoredNodes);
+    return new HdfsDataOutputStream(out, statistics);
+  }
   
   @Override
   public HdfsDataOutputStream create(Path f, FsPermission permission,

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -306,6 +306,8 @@ public interface ClientProtocol {
    * @param excludeNodes a list of nodes that should not be
    * allocated for the current block
    * @param fileId the id uniquely identifying a file
+   * @param favoredNodes the list of nodes where the client wants the blocks.
+   *          Nodes are identified by either host name or address.
    *
    * @return LocatedBlock allocated block information.
    *
@@ -320,7 +322,8 @@ public interface ClientProtocol {
    */
   @Idempotent
   public LocatedBlock addBlock(String src, String clientName,
-      ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId)
+      ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, 
+      String[] favoredNodes)
       throws AccessControlException, FileNotFoundException,
       NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
       IOException;

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java

@@ -382,12 +382,15 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
     
     try {
       List<DatanodeInfoProto> excl = req.getExcludeNodesList();
+      List<String> favor = req.getFavoredNodesList();
       LocatedBlock result = server.addBlock(
           req.getSrc(),
           req.getClientName(),
           req.hasPrevious() ? PBHelper.convert(req.getPrevious()) : null,
           (excl == null || excl.size() == 0) ? null : PBHelper.convert(excl
-              .toArray(new DatanodeInfoProto[excl.size()])), req.getFileId());
+              .toArray(new DatanodeInfoProto[excl.size()])), req.getFileId(),
+          (favor == null || favor.size() == 0) ? null : favor
+              .toArray(new String[favor.size()]));
       return AddBlockResponseProto.newBuilder()
           .setBlock(PBHelper.convert(result)).build();
     } catch (IOException e) {

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java

@@ -312,7 +312,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
   
   @Override
   public LocatedBlock addBlock(String src, String clientName,
-      ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId)
+      ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
+      String[] favoredNodes)
       throws AccessControlException, FileNotFoundException,
       NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
       IOException {
@@ -322,6 +323,9 @@ public class ClientNamenodeProtocolTranslatorPB implements
       req.setPrevious(PBHelper.convert(previous)); 
     if (excludeNodes != null) 
       req.addAllExcludeNodes(PBHelper.convert(excludeNodes));
+    if (favoredNodes != null) {
+      req.addAllFavoredNodes(Arrays.asList(favoredNodes));
+    }
     try {
       return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock());
     } catch (ServiceException e) {

+ 53 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
@@ -41,7 +42,10 @@ public class BlockInfoUnderConstruction extends BlockInfo {
    */
   private List<ReplicaUnderConstruction> replicas;
 
-  /** A data-node responsible for block recovery. */
+  /**
+   * Index of the primary data node doing the recovery. Useful for log
+   * messages.
+   */
   private int primaryNodeIndex = -1;
 
   /**
@@ -62,6 +66,7 @@ public class BlockInfoUnderConstruction extends BlockInfo {
   static class ReplicaUnderConstruction extends Block {
     private DatanodeDescriptor expectedLocation;
     private ReplicaState state;
+    private boolean chosenAsPrimary;
 
     ReplicaUnderConstruction(Block block,
                              DatanodeDescriptor target,
@@ -69,6 +74,7 @@ public class BlockInfoUnderConstruction extends BlockInfo {
       super(block);
       this.expectedLocation = target;
       this.state = state;
+      this.chosenAsPrimary = false;
     }
 
     /**
@@ -88,6 +94,13 @@ public class BlockInfoUnderConstruction extends BlockInfo {
       return state;
     }
 
+    /**
+     * Whether the replica was chosen for recovery.
+     */
+    boolean getChosenAsPrimary() {
+      return chosenAsPrimary;
+    }
+
     /**
      * Set replica state.
      */
@@ -95,6 +108,13 @@ public class BlockInfoUnderConstruction extends BlockInfo {
       state = s;
     }
 
+    /**
+     * Set whether this replica was chosen for recovery.
+     */
+    void setChosenAsPrimary(boolean chosenAsPrimary) {
+      this.chosenAsPrimary = chosenAsPrimary;
+    }
+
     /**
      * Is data-node the replica belongs to alive.
      */
@@ -237,19 +257,40 @@ public class BlockInfoUnderConstruction extends BlockInfo {
         + " BlockInfoUnderConstruction.initLeaseRecovery:"
         + " No blocks found, lease removed.");
     }
-
-    int previous = primaryNodeIndex;
-    for(int i = 1; i <= replicas.size(); i++) {
-      int j = (previous + i)%replicas.size();
-      if (replicas.get(j).isAlive()) {
-        primaryNodeIndex = j;
-        DatanodeDescriptor primary = replicas.get(j).getExpectedLocation(); 
-        primary.addBlockToBeRecovered(this);
-        NameNode.blockStateChangeLog.info("BLOCK* " + this
-          + " recovery started, primary=" + primary);
-        return;
+    boolean allLiveReplicasTriedAsPrimary = true;
+    for (int i = 0; i < replicas.size(); i++) {
+      // Check if all replicas have been tried or not.
+      if (replicas.get(i).isAlive()) {
+        allLiveReplicasTriedAsPrimary =
+            (allLiveReplicasTriedAsPrimary && replicas.get(i).getChosenAsPrimary());
+      }
+    }
+    if (allLiveReplicasTriedAsPrimary) {
+      // Just set all the replicas to be chosen whether they are alive or not.
+      for (int i = 0; i < replicas.size(); i++) {
+        replicas.get(i).setChosenAsPrimary(false);
       }
     }
+    long mostRecentLastUpdate = 0;
+    ReplicaUnderConstruction primary = null;
+    primaryNodeIndex = -1;
+    for(int i = 0; i < replicas.size(); i++) {
+      // Skip alive replicas which have been chosen for recovery.
+      if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary())) {
+        continue;
+      }
+      if (replicas.get(i).getExpectedLocation().getLastUpdate() > mostRecentLastUpdate) {
+        primary = replicas.get(i);
+        primaryNodeIndex = i;
+        mostRecentLastUpdate = primary.getExpectedLocation().getLastUpdate();
+      }
+    }
+    if (primary != null) {
+      primary.getExpectedLocation().addBlockToBeRecovered(this);
+      primary.setChosenAsPrimary(true);
+      NameNode.blockStateChangeLog.info("BLOCK* " + this
+        + " recovery started, primary=" + primary);
+    }
   }
 
   void addReplicaIfNotPresent(DatanodeDescriptor dn,

+ 25 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -59,6 +59,7 @@ import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@@ -72,6 +73,7 @@ import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Time;
 
@@ -1333,11 +1335,12 @@ public class BlockManager {
   public DatanodeDescriptor[] chooseTarget(final String src,
       final int numOfReplicas, final DatanodeDescriptor client,
       final HashMap<Node, Node> excludedNodes,
-      final long blocksize) throws IOException {
-    // choose targets for the new block to be allocated.
+      final long blocksize, List<String> favoredNodes) throws IOException {
+    List<DatanodeDescriptor> favoredDatanodeDescriptors = 
+        getDatanodeDescriptors(favoredNodes);
     final DatanodeDescriptor targets[] = blockplacement.chooseTarget(src,
-        numOfReplicas, client, new ArrayList<DatanodeDescriptor>(), false,
-        excludedNodes, blocksize);
+        numOfReplicas, client, excludedNodes, blocksize, 
+        favoredDatanodeDescriptors);
     if (targets.length < minReplication) {
       throw new IOException("File " + src + " could only be replicated to "
           + targets.length + " nodes instead of minReplication (="
@@ -1350,6 +1353,24 @@ public class BlockManager {
     return targets;
   }
 
+  /**
+   * Get list of datanode descriptors for given list of nodes. Nodes are
+   * hostaddress:port or just hostaddress.
+   */
+  List<DatanodeDescriptor> getDatanodeDescriptors(List<String> nodes) {
+    List<DatanodeDescriptor> datanodeDescriptors = null;
+    if (nodes != null) {
+      datanodeDescriptors = new ArrayList<DatanodeDescriptor>(nodes.size());
+      for (int i = 0; i < nodes.size(); i++) {
+        DatanodeDescriptor node = datanodeManager.getDatanodeDescriptor(nodes.get(i));
+        if (node != null) {
+          datanodeDescriptors.add(node);
+        }
+      }
+    }
+    return datanodeDescriptors;
+  }
+
   /**
    * Parse the data-nodes the block belongs to and choose one,
    * which will be the replication source.

+ 19 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java

@@ -118,6 +118,25 @@ public abstract class BlockPlacementPolicy {
     return chooseTarget(srcBC.getName(), numOfReplicas, writer,
                         chosenNodes, false, excludedNodes, blocksize);
   }
+  
+  /**
+   * Same as {@link #chooseTarget(String, int, DatanodeDescriptor, List, boolean, 
+   * HashMap, long)} with added parameter {@code favoredDatanodes}
+   * @param favoredNodes datanodes that should be favored as targets. This
+   *          is only a hint and due to cluster state, namenode may not be 
+   *          able to place the blocks on these datanodes.
+   */
+  DatanodeDescriptor[] chooseTarget(String src,
+      int numOfReplicas, DatanodeDescriptor writer,
+      HashMap<Node, Node> excludedNodes,
+      long blocksize, List<DatanodeDescriptor> favoredNodes) {
+    // This class does not provide the functionality of placing
+    // a block in favored datanodes. The implementations of this class
+    // are expected to provide this functionality
+    return chooseTarget(src, numOfReplicas, writer, 
+        new ArrayList<DatanodeDescriptor>(numOfReplicas), false, excludedNodes, 
+        blocksize);
+  }
 
   /**
    * Verify that the block is replicated on at least minRacks different racks

+ 69 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java

@@ -125,6 +125,60 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
         excludedNodes, blocksize);
   }
 
+  @Override
+  DatanodeDescriptor[] chooseTarget(String src, int numOfReplicas,
+      DatanodeDescriptor writer, HashMap<Node, Node> excludedNodes,
+      long blocksize, List<DatanodeDescriptor> favoredNodes) {
+    try {
+      if (favoredNodes == null || favoredNodes.size() == 0) {
+        // Favored nodes not specified, fall back to regular block placement.
+        return chooseTarget(src, numOfReplicas, writer,
+            new ArrayList<DatanodeDescriptor>(numOfReplicas), false, 
+            excludedNodes, blocksize);
+      }
+
+      HashMap<Node, Node> favoriteAndExcludedNodes = excludedNodes == null ?
+          new HashMap<Node, Node>() : new HashMap<Node, Node>(excludedNodes);
+
+      // Choose favored nodes
+      List<DatanodeDescriptor> results = new ArrayList<DatanodeDescriptor>();
+      boolean avoidStaleNodes = stats != null
+          && stats.isAvoidingStaleDataNodesForWrite();
+      for (int i = 0; i < Math.min(favoredNodes.size(), numOfReplicas); i++) {
+        DatanodeDescriptor favoredNode = favoredNodes.get(i);
+        // Choose a single node which is local to favoredNode.
+        // 'results' is updated within chooseLocalNode
+        DatanodeDescriptor target = chooseLocalNode(favoredNode,
+            favoriteAndExcludedNodes, blocksize, 
+            getMaxNodesPerRack(results, 
+                numOfReplicas)[1], results, avoidStaleNodes);
+        if (target == null) {
+          LOG.warn("Could not find a target for file " + src
+              + " with favored node " + favoredNode); 
+          continue;
+        }
+        favoriteAndExcludedNodes.put(target, target);
+      }
+
+      if (results.size() < numOfReplicas) {        
+        // Not enough favored nodes, choose other nodes.
+        numOfReplicas -= results.size();
+        DatanodeDescriptor[] remainingTargets = 
+            chooseTarget(src, numOfReplicas, writer, results,
+                false, favoriteAndExcludedNodes, blocksize);
+        for (int i = 0; i < remainingTargets.length; i++) {
+          results.add(remainingTargets[i]);
+        }
+      }
+      return results.toArray(new DatanodeDescriptor[results.size()]);
+    } catch (NotEnoughReplicasException nr) {
+      // Fall back to regular block placement disregarding favored nodes hint
+      return chooseTarget(src, numOfReplicas, writer, 
+          new ArrayList<DatanodeDescriptor>(numOfReplicas), false, 
+          excludedNodes, blocksize);
+    }
+  }
+
   /** This is the implementation. */
   DatanodeDescriptor[] chooseTarget(int numOfReplicas,
                                     DatanodeDescriptor writer,
@@ -140,15 +194,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
       excludedNodes = new HashMap<Node, Node>();
     }
      
-    int clusterSize = clusterMap.getNumOfLeaves();
-    int totalNumOfReplicas = chosenNodes.size()+numOfReplicas;
-    if (totalNumOfReplicas > clusterSize) {
-      numOfReplicas -= (totalNumOfReplicas-clusterSize);
-      totalNumOfReplicas = clusterSize;
-    }
-      
-    int maxNodesPerRack = 
-      (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
+    int[] result = getMaxNodesPerRack(chosenNodes, numOfReplicas);
+    numOfReplicas = result[0];
+    int maxNodesPerRack = result[1];
       
     List<DatanodeDescriptor> results = 
       new ArrayList<DatanodeDescriptor>(chosenNodes);
@@ -174,6 +222,18 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     return getPipeline((writer==null)?localNode:writer,
                        results.toArray(new DatanodeDescriptor[results.size()]));
   }
+
+  private int[] getMaxNodesPerRack(List<DatanodeDescriptor> chosenNodes,
+      int numOfReplicas) {
+    int clusterSize = clusterMap.getNumOfLeaves();
+    int totalNumOfReplicas = chosenNodes.size()+numOfReplicas;
+    if (totalNumOfReplicas > clusterSize) {
+      numOfReplicas -= (totalNumOfReplicas-clusterSize);
+      totalNumOfReplicas = clusterSize;
+    }
+    int maxNodesPerRack = (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
+    return new int[] {numOfReplicas, maxNodesPerRack};
+  }
     
   /* choose <i>numOfReplicas</i> from all data nodes */
   private DatanodeDescriptor chooseTarget(int numOfReplicas,

+ 103 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -213,7 +213,7 @@ public class DatanodeManager {
         " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
         "It should be a positive non-zero float value, not greater than 1.0f.");
   }
-  
+
   private static long getStaleIntervalFromConf(Configuration conf,
       long heartbeatExpireInterval) {
     long staleInterval = conf.getLong(
@@ -326,6 +326,68 @@ public class DatanodeManager {
     return host2DatanodeMap.getDatanodeByHost(host);
   }
 
+  /** @return the datanode descriptor for the host. */
+  public DatanodeDescriptor getDatanodeByXferAddr(String host, int xferPort) {
+    return host2DatanodeMap.getDatanodeByXferAddr(host, xferPort);
+  }
+
+  /**
+   * Given datanode address or host name, returns the DatanodeDescriptor for the
+   * same, or if it doesn't find the datanode, it looks for a machine local and
+   * then rack local datanode, if a rack local datanode is not possible either,
+   * it returns the DatanodeDescriptor of any random node in the cluster.
+   *
+   * @param address hostaddress:transfer address
+   * @return the best match for the given datanode
+   * @throws IOException when no datanode is found for given address
+   */
+  DatanodeDescriptor getDatanodeDescriptor(String address) {
+    DatanodeDescriptor node = null;
+    int colon = address.indexOf(":");
+    int xferPort;
+    String host = address;
+    if (colon > 0) {
+      host = address.substring(0, colon);
+      xferPort = Integer.parseInt(address.substring(colon+1));
+      node = getDatanodeByXferAddr(host, xferPort);
+    }
+    if (node == null) {
+      node = getDatanodeByHost(host);
+    }
+    if (node == null) {
+      String networkLocation = resolveNetworkLocation(host);
+
+      // If the current cluster doesn't contain the node, fallback to
+      // something machine local and then rack local.
+      List<Node> rackNodes = getNetworkTopology()
+                                   .getDatanodesInRack(networkLocation);
+      if (rackNodes != null) {
+        // Try something machine local.
+        for (Node rackNode : rackNodes) {
+          if (((DatanodeDescriptor) rackNode).getIpAddr().equals(host)) {
+            node = (DatanodeDescriptor) rackNode;
+            break;
+          }
+        }
+
+        // Try something rack local.
+        if (node == null && !rackNodes.isEmpty()) {
+          node = (DatanodeDescriptor) (rackNodes
+              .get(DFSUtil.getRandom().nextInt(rackNodes.size())));
+        }
+      }
+
+      // If we can't even choose rack local, just choose any node in the
+      // cluster.
+      if (node == null) {
+        node = (DatanodeDescriptor)getNetworkTopology()
+                                   .chooseRandom(NodeBase.ROOT);
+      }
+    }
+    return node;
+  }
+
+
   /** Get a datanode descriptor given corresponding storageID */
   DatanodeDescriptor getDatanode(final String storageID) {
     return datanodeMap.get(storageID);
@@ -455,8 +517,13 @@ public class DatanodeManager {
     }
   }
 
+  public String resolveNetworkLocation(String host) {
+    DatanodeID d = parseDNFromHostsEntry(host);
+    return resolveNetworkLocation(d);
+  }
+
   /* Resolve a node's network location */
-  private void resolveNetworkLocation (DatanodeDescriptor node) {
+  private String resolveNetworkLocation (DatanodeID node) {
     List<String> names = new ArrayList<String>(1);
     if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
       names.add(node.getIpAddr());
@@ -474,7 +541,7 @@ public class DatanodeManager {
     } else {
       networkLocation = rName.get(0);
     }
-    node.setNetworkLocation(networkLocation);
+    return networkLocation;
   }
 
   private boolean inHostsList(DatanodeID node) {
@@ -707,7 +774,7 @@ public class DatanodeManager {
           nodeS.setDisallowed(false); // Node is in the include list
           
           // resolve network location
-          resolveNetworkLocation(nodeS);
+          nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));
           getNetworkTopology().add(nodeS);
             
           // also treat the registration message as a heartbeat
@@ -739,7 +806,7 @@ public class DatanodeManager {
         = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK);
       boolean success = false;
       try {
-        resolveNetworkLocation(nodeDescr);
+        nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr));
         networktopology.add(nodeDescr);
   
         // register new datanode
@@ -875,7 +942,7 @@ public class DatanodeManager {
         (numStaleNodes <= heartbeatManager.getLiveDatanodeCount()
             * ratioUseStaleDataNodesForWrite);
   }
-  
+
   /**
    * @return The time interval used to mark DataNodes as stale.
    */
@@ -1093,7 +1160,7 @@ public class DatanodeManager {
    * failed.  As a special case, the loopback address is also considered
    * acceptable.  This is particularly important on Windows, where 127.0.0.1 does
    * not resolve to "localhost".
-   * 
+   *
    * @param address InetAddress to check
    * @return boolean true if name resolution successful or address is loopback
    */
@@ -1127,7 +1194,7 @@ public class DatanodeManager {
           setDatanodeDead(nodeinfo);
           throw new DisallowedDatanodeException(nodeinfo);
         }
-         
+
         if (nodeinfo == null || !nodeinfo.isAlive) {
           return new DatanodeCommand[]{RegisterCommand.REGISTER};
         }
@@ -1142,9 +1209,34 @@ public class DatanodeManager {
           BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
               blocks.length);
           for (BlockInfoUnderConstruction b : blocks) {
-            brCommand.add(new RecoveringBlock(
-                new ExtendedBlock(blockPoolId, b), b.getExpectedLocations(), b
-                    .getBlockRecoveryId()));
+            DatanodeDescriptor[] expectedLocations = b.getExpectedLocations();
+            // Skip stale nodes during recovery - not heart beated for some time (30s by default).
+            List<DatanodeDescriptor> recoveryLocations =
+                new ArrayList<DatanodeDescriptor>(expectedLocations.length);
+            for (int i = 0; i < expectedLocations.length; i++) {
+              if (!expectedLocations[i].isStale(this.staleInterval)) {
+                recoveryLocations.add(expectedLocations[i]);
+              }
+            }
+            // If we only get 1 replica after eliminating stale nodes, then choose all
+            // replicas for recovery and let the primary data node handle failures.
+            if (recoveryLocations.size() > 1) {
+              if (recoveryLocations.size() != expectedLocations.length) {
+                LOG.info("Skipped stale nodes for recovery : " +
+                    (expectedLocations.length - recoveryLocations.size()));
+              }
+              brCommand.add(new RecoveringBlock(
+                  new ExtendedBlock(blockPoolId, b),
+                  recoveryLocations.toArray(new DatanodeDescriptor[recoveryLocations.size()]),
+                  b.getBlockRecoveryId()));
+            } else {
+              // If too many replicas are stale, then choose all replicas to participate
+              // in block recovery.
+              brCommand.add(new RecoveringBlock(
+                  new ExtendedBlock(blockPoolId, b),
+                  expectedLocations,
+                  b.getBlockRecoveryId()));
+            }
           }
           return new DatanodeCommand[] { brCommand };
         }

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -2452,6 +2452,8 @@ public class FSDirectory implements Closeable {
     try {
       setReady(false);
       rootDir = createRoot(getFSNamesystem());
+      inodeMap.clear();
+      addToInodeMapUnprotected(rootDir);
       nameCache.reset();
     } finally {
       writeUnlock();

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -2228,7 +2228,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * client to "try again later".
    */
   LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
-      ExtendedBlock previous, HashMap<Node, Node> excludedNodes)
+      ExtendedBlock previous, HashMap<Node, Node> excludedNodes, 
+      List<String> favoredNodes)
       throws LeaseExpiredException, NotReplicatedYetException,
       QuotaExceededException, SafeModeException, UnresolvedLinkException,
       IOException {
@@ -2268,8 +2269,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
 
     // choose targets for the new block to be allocated.
-    final DatanodeDescriptor targets[] = getBlockManager().chooseTarget(
-        src, replication, clientNode, excludedNodes, blockSize);
+    final DatanodeDescriptor targets[] = getBlockManager().chooseTarget( 
+        src, replication, clientNode, excludedNodes, blockSize, favoredNodes);
 
     // Part II.
     // Allocate a new block, add it to the INode and the BlocksMap. 

+ 6 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -29,6 +29,7 @@ import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
@@ -484,7 +485,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
   
   @Override
   public LocatedBlock addBlock(String src, String clientName,
-      ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId)
+      ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
+      String[] favoredNodes)
       throws IOException {
     if (stateChangeLog.isDebugEnabled()) {
       stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src
@@ -497,8 +499,10 @@ class NameNodeRpcServer implements NamenodeProtocols {
         excludedNodesSet.put(node, node);
       }
     }
+    List<String> favoredNodesList = (favoredNodes == null) ? null
+        : Arrays.asList(favoredNodes);
     LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
-        clientName, previous, excludedNodesSet);
+        clientName, previous, excludedNodesSet, favoredNodesList);
     if (locatedBlock != null)
       metrics.incrAddBlockOps();
     return locatedBlock;

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto

@@ -121,6 +121,7 @@ message AddBlockRequestProto {
   optional ExtendedBlockProto previous = 3;
   repeated DatanodeInfoProto excludeNodes = 4;
   optional uint64 fileId = 5 [default = 0];  // default as a bogus id
+  repeated string favoredNodes = 6; //the set of datanodes to use for the block
 }
 
 message AddBlockResponseProto {

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -1072,7 +1072,10 @@
     otherwise this may cause too frequent change of stale states. 
     We thus set a minimum stale interval value (the default value is 3 times 
     of heartbeat interval) and guarantee that the stale interval cannot be less
-    than the minimum value.
+    than the minimum value. A stale data node is avoided during lease/block
+    recovery. It can be conditionally avoided for reads (see
+    dfs.namenode.avoid.read.stale.datanode) and for writes (see
+    dfs.namenode.avoid.write.stale.datanode).
   </description>
 </property>
 

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java

@@ -241,7 +241,7 @@ public class TestDFSClientRetries {
                          anyString(),
                          any(ExtendedBlock.class),
                          any(DatanodeInfo[].class),
-                         anyLong())).thenAnswer(answer);
+                         anyLong(), any(String[].class))).thenAnswer(answer);
     
     Mockito.doReturn(
             new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
@@ -390,7 +390,7 @@ public class TestDFSClientRetries {
         }
       }).when(spyNN).addBlock(Mockito.anyString(), Mockito.anyString(),
           Mockito.<ExtendedBlock> any(), Mockito.<DatanodeInfo[]> any(),
-          Mockito.anyLong());
+          Mockito.anyLong(), Mockito.<String[]> any());
 
       doAnswer(new Answer<Boolean>() {
 
@@ -432,7 +432,7 @@ public class TestDFSClientRetries {
       Mockito.verify(spyNN, Mockito.atLeastOnce()).addBlock(
           Mockito.anyString(), Mockito.anyString(),
           Mockito.<ExtendedBlock> any(), Mockito.<DatanodeInfo[]> any(),
-          Mockito.anyLong());
+          Mockito.anyLong(), Mockito.<String[]> any());
       Mockito.verify(spyNN, Mockito.atLeastOnce()).complete(
           Mockito.anyString(), Mockito.anyString(),
           Mockito.<ExtendedBlock>any());

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java

@@ -519,7 +519,7 @@ public class TestFileCreation {
 
       // add one block to the file
       LocatedBlock location = client.getNamenode().addBlock(file1.toString(),
-          client.clientName, null, null, INodeId.GRANDFATHER_INODE_ID);
+          client.clientName, null, null, INodeId.GRANDFATHER_INODE_ID, null);
       System.out.println("testFileCreationError2: "
           + "Added block " + location.getBlock());
 
@@ -570,7 +570,7 @@ public class TestFileCreation {
       createFile(dfs, f, 3);
       try {
         cluster.getNameNodeRpc().addBlock(f.toString(), client.clientName,
-            null, null, INodeId.GRANDFATHER_INODE_ID);
+            null, null, INodeId.GRANDFATHER_INODE_ID, null);
         fail();
       } catch(IOException ioe) {
         FileSystem.LOG.info("GOOD!", ioe);

+ 86 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java

@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.junit.Test;
+
+/**
+ * This class provides tests for BlockInfoUnderConstruction class
+ */
+public class TestBlockInfoUnderConstruction {
+  @Test
+  public void testInitializeBlockRecovery() throws Exception {
+    DatanodeDescriptor dd1 = DFSTestUtil.getDatanodeDescriptor("10.10.1.1",
+        "default");
+    DatanodeDescriptor dd2 = DFSTestUtil.getDatanodeDescriptor("10.10.1.2",
+        "default");
+    DatanodeDescriptor dd3 = DFSTestUtil.getDatanodeDescriptor("10.10.1.3",
+        "default");
+    dd1.isAlive = dd2.isAlive = dd3.isAlive = true;
+    BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
+        new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP),
+        3,
+        BlockUCState.UNDER_CONSTRUCTION,
+        new DatanodeDescriptor[] {dd1, dd2, dd3});
+
+    // Recovery attempt #1.
+    long currentTime = System.currentTimeMillis();
+    dd1.setLastUpdate(currentTime - 3 * 1000);
+    dd2.setLastUpdate(currentTime - 1 * 1000);
+    dd3.setLastUpdate(currentTime - 2 * 1000);
+    blockInfo.initializeBlockRecovery(1);
+    BlockInfoUnderConstruction[] blockInfoRecovery = dd2.getLeaseRecoveryCommand(1);
+    assertEquals(blockInfoRecovery[0], blockInfo);
+
+    // Recovery attempt #2.
+    currentTime = System.currentTimeMillis();
+    dd1.setLastUpdate(currentTime - 2 * 1000);
+    dd2.setLastUpdate(currentTime - 1 * 1000);
+    dd3.setLastUpdate(currentTime - 3 * 1000);
+    blockInfo.initializeBlockRecovery(2);
+    blockInfoRecovery = dd1.getLeaseRecoveryCommand(1);
+    assertEquals(blockInfoRecovery[0], blockInfo);
+
+    // Recovery attempt #3.
+    currentTime = System.currentTimeMillis();
+    dd1.setLastUpdate(currentTime - 2 * 1000);
+    dd2.setLastUpdate(currentTime - 1 * 1000);
+    dd3.setLastUpdate(currentTime - 3 * 1000);
+    currentTime = System.currentTimeMillis();
+    blockInfo.initializeBlockRecovery(3);
+    blockInfoRecovery = dd3.getLeaseRecoveryCommand(1);
+    assertEquals(blockInfoRecovery[0], blockInfo);
+
+    // Recovery attempt #4.
+    // Reset everything. And again pick DN with most recent heart beat.
+    currentTime = System.currentTimeMillis();
+    dd1.setLastUpdate(currentTime - 2 * 1000);
+    dd2.setLastUpdate(currentTime - 1 * 1000);
+    dd3.setLastUpdate(currentTime);
+    currentTime = System.currentTimeMillis();
+    blockInfo.initializeBlockRecovery(3);
+    blockInfoRecovery = dd3.getLeaseRecoveryCommand(1);
+    assertEquals(blockInfoRecovery[0], blockInfo);
+  }
+}

+ 117 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java

@@ -20,17 +20,21 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -56,14 +60,12 @@ public class TestHeartbeatHandling {
       final HeartbeatManager hm = namesystem.getBlockManager(
           ).getDatanodeManager().getHeartbeatManager();
       final String poolId = namesystem.getBlockPoolId();
-      final DatanodeRegistration nodeReg = 
+      final DatanodeRegistration nodeReg =
         DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
-
-
       final DatanodeDescriptor dd = NameNodeAdapter.getDatanode(namesystem, nodeReg);
-      
+
       final int REMAINING_BLOCKS = 1;
-      final int MAX_REPLICATE_LIMIT = 
+      final int MAX_REPLICATE_LIMIT =
         conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 2);
       final int MAX_INVALIDATE_LIMIT = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT;
       final int MAX_INVALIDATE_BLOCKS = 2*MAX_INVALIDATE_LIMIT+REMAINING_BLOCKS;
@@ -83,7 +85,7 @@ public class TestHeartbeatHandling {
           assertEquals(1, cmds.length);
           assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction());
           assertEquals(MAX_REPLICATE_LIMIT, ((BlockCommand)cmds[0]).getBlocks().length);
-          
+
           ArrayList<Block> blockList = new ArrayList<Block>(MAX_INVALIDATE_BLOCKS);
           for (int i=0; i<MAX_INVALIDATE_BLOCKS; i++) {
             blockList.add(new Block(i, 0, GenerationStamp.LAST_RESERVED_STAMP));
@@ -122,4 +124,113 @@ public class TestHeartbeatHandling {
       cluster.shutdown();
     }
   }
+
+  /**
+   * Test if
+   * {@link FSNamesystem#handleHeartbeat}
+   * correctly selects data node targets for block recovery.
+   */
+  @Test
+  public void testHeartbeatBlockRecovery() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    try {
+      cluster.waitActive();
+      final FSNamesystem namesystem = cluster.getNamesystem();
+      final HeartbeatManager hm = namesystem.getBlockManager(
+          ).getDatanodeManager().getHeartbeatManager();
+      final String poolId = namesystem.getBlockPoolId();
+      final DatanodeRegistration nodeReg1 =
+        DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
+      final DatanodeDescriptor dd1 = NameNodeAdapter.getDatanode(namesystem, nodeReg1);
+      final DatanodeRegistration nodeReg2 =
+        DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(1), poolId);
+      final DatanodeDescriptor dd2 = NameNodeAdapter.getDatanode(namesystem, nodeReg2);
+      final DatanodeRegistration nodeReg3 = 
+        DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(2), poolId);
+      final DatanodeDescriptor dd3 = NameNodeAdapter.getDatanode(namesystem, nodeReg3);
+
+      try {
+        namesystem.writeLock();
+        synchronized(hm) {
+          NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem);
+          NameNodeAdapter.sendHeartBeat(nodeReg2, dd2, namesystem);
+          NameNodeAdapter.sendHeartBeat(nodeReg3, dd3, namesystem);
+
+          // Test with all alive nodes.
+          dd1.setLastUpdate(System.currentTimeMillis());
+          dd2.setLastUpdate(System.currentTimeMillis());
+          dd3.setLastUpdate(System.currentTimeMillis());
+          BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
+              new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
+              BlockUCState.UNDER_RECOVERY,
+              new DatanodeDescriptor[] {dd1, dd2, dd3});
+          dd1.addBlockToBeRecovered(blockInfo);
+          DatanodeCommand[] cmds =
+              NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
+          assertEquals(1, cmds.length);
+          assertEquals(DatanodeProtocol.DNA_RECOVERBLOCK, cmds[0].getAction());
+          BlockRecoveryCommand recoveryCommand = (BlockRecoveryCommand)cmds[0];
+          assertEquals(1, recoveryCommand.getRecoveringBlocks().size());
+          DatanodeInfo[] recoveringNodes = recoveryCommand.getRecoveringBlocks()
+              .toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
+          assertEquals(3, recoveringNodes.length);
+          assertEquals(recoveringNodes[0], (DatanodeInfo)dd1);
+          assertEquals(recoveringNodes[1], (DatanodeInfo)dd2);
+          assertEquals(recoveringNodes[2], (DatanodeInfo)dd3);
+
+          // Test with one stale node.
+          dd1.setLastUpdate(System.currentTimeMillis());
+          // More than the default stale interval of 30 seconds.
+          dd2.setLastUpdate(System.currentTimeMillis() - 40 * 1000);
+          dd3.setLastUpdate(System.currentTimeMillis());
+          blockInfo = new BlockInfoUnderConstruction(
+              new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
+              BlockUCState.UNDER_RECOVERY,
+              new DatanodeDescriptor[] {dd1, dd2, dd3});
+          dd1.addBlockToBeRecovered(blockInfo);
+          cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
+          assertEquals(1, cmds.length);
+          assertEquals(DatanodeProtocol.DNA_RECOVERBLOCK, cmds[0].getAction());
+          recoveryCommand = (BlockRecoveryCommand)cmds[0];
+          assertEquals(1, recoveryCommand.getRecoveringBlocks().size());
+          recoveringNodes = recoveryCommand.getRecoveringBlocks()
+              .toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
+          assertEquals(2, recoveringNodes.length);
+          // dd2 is skipped.
+          assertEquals(recoveringNodes[0], (DatanodeInfo)dd1);
+          assertEquals(recoveringNodes[1], (DatanodeInfo)dd3);
+
+          // Test with all stale node.
+          dd1.setLastUpdate(System.currentTimeMillis() - 60 * 1000);
+          // More than the default stale interval of 30 seconds.
+          dd2.setLastUpdate(System.currentTimeMillis() - 40 * 1000);
+          dd3.setLastUpdate(System.currentTimeMillis() - 80 * 1000);
+          blockInfo = new BlockInfoUnderConstruction(
+              new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), 3,
+              BlockUCState.UNDER_RECOVERY,
+              new DatanodeDescriptor[] {dd1, dd2, dd3});
+          dd1.addBlockToBeRecovered(blockInfo);
+          cmds = NameNodeAdapter.sendHeartBeat(nodeReg1, dd1, namesystem).getCommands();
+          assertEquals(1, cmds.length);
+          assertEquals(DatanodeProtocol.DNA_RECOVERBLOCK, cmds[0].getAction());
+          recoveryCommand = (BlockRecoveryCommand)cmds[0];
+          assertEquals(1, recoveryCommand.getRecoveringBlocks().size());
+          recoveringNodes = recoveryCommand.getRecoveringBlocks()
+              .toArray(new BlockRecoveryCommand.RecoveringBlock[0])[0].getLocations();
+          // Only dd1 is included since it heart beated and hence its not stale
+          // when the list of recovery blocks is constructed.
+          assertEquals(3, recoveringNodes.length);
+          assertEquals(recoveringNodes[0], (DatanodeInfo)dd1);
+          assertEquals(recoveringNodes[1], (DatanodeInfo)dd2);
+          assertEquals(recoveringNodes[2], (DatanodeInfo)dd3);
+        }
+      } finally {
+        namesystem.writeUnlock();
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java

@@ -1059,7 +1059,7 @@ public class NNThroughputBenchmark {
       ExtendedBlock prevBlock = null;
       for(int jdx = 0; jdx < blocksPerFile; jdx++) {
         LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName,
-            prevBlock, null, INodeId.GRANDFATHER_INODE_ID);
+            prevBlock, null, INodeId.GRANDFATHER_INODE_ID, null);
         prevBlock = loc.getBlock();
         for(DatanodeInfo dnInfo : loc.getLocations()) {
           int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getXferAddr());

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java

@@ -26,6 +26,7 @@ import static org.mockito.Mockito.spy;
 import java.lang.reflect.Field;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -108,7 +109,7 @@ public class TestAddBlockRetry {
         if(count == 1) { // run second addBlock()
           LOG.info("Starting second addBlock for " + src);
           nn.addBlock(src, "clientName", null, null,
-              INodeId.GRANDFATHER_INODE_ID);
+              INodeId.GRANDFATHER_INODE_ID, null);
           LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
           assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size());
           lb2 = lbs.get(0);
@@ -119,7 +120,7 @@ public class TestAddBlockRetry {
       }
     }).when(spyBM).chooseTarget(Mockito.anyString(), Mockito.anyInt(),
         Mockito.<DatanodeDescriptor>any(), Mockito.<HashMap<Node, Node>>any(),
-        Mockito.anyLong());
+        Mockito.anyLong(), Mockito.<List<String>>any());
 
     // create file
     nn.create(src, FsPermission.getFileDefault(),
@@ -129,7 +130,7 @@ public class TestAddBlockRetry {
 
     // start first addBlock()
     LOG.info("Starting first addBlock for " + src);
-    nn.addBlock(src, "clientName", null, null, INodeId.GRANDFATHER_INODE_ID);
+    nn.addBlock(src, "clientName", null, null, INodeId.GRANDFATHER_INODE_ID, null);
 
     // check locations
     LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java

@@ -77,6 +77,7 @@ import org.apache.hadoop.util.ExitUtil.ExitException;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Level;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
@@ -1061,6 +1062,10 @@ public class TestCheckpoint {
       secondary = startSecondaryNameNode(conf);
       secondary.doCheckpoint();
       
+      FSDirectory secondaryFsDir = secondary.getFSNamesystem().dir;
+      INode rootInMap = secondaryFsDir.getInode(secondaryFsDir.rootDir.getId());
+      Assert.assertSame(rootInMap, secondaryFsDir.rootDir);
+      
       fileSys.delete(tmpDir, true);
       fileSys.mkdirs(tmpDir);
       secondary.doCheckpoint();

+ 220 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java

@@ -0,0 +1,220 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Random;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.junit.Test;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+
+public class TestFavoredNodesEndToEnd {
+  private static MiniDFSCluster cluster;
+  private static Configuration conf;
+  private final static int NUM_DATA_NODES = 10;
+  private final static int NUM_FILES = 10;
+  private final static byte[] SOME_BYTES = new String("foo").getBytes();
+  private static DistributedFileSystem dfs;
+  private static ArrayList<DataNode> datanodes;
+  
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf = new Configuration();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES)
+        .build();
+    cluster.waitClusterUp();
+    dfs = cluster.getFileSystem();
+    datanodes = cluster.getDataNodes();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    if (cluster != null) { 
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testFavoredNodesEndToEnd() throws Exception {
+    //create 10 files with random preferred nodes
+    for (int i = 0; i < NUM_FILES; i++) {
+      Random rand = new Random(System.currentTimeMillis() + i);
+      //pass a new created rand so as to get a uniform distribution each time
+      //without too much collisions (look at the do-while loop in getDatanodes)
+      InetSocketAddress datanode[] = getDatanodes(rand);
+      Path p = new Path("/filename"+i);
+      FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true,
+          4096, (short)3, (long)4096, null, datanode);
+      out.write(SOME_BYTES);
+      out.close();
+      BlockLocation[] locations = 
+          dfs.getClient().getBlockLocations(p.toUri().getPath(), 0, 
+              Long.MAX_VALUE);
+      //make sure we have exactly one block location, and three hosts
+      assertTrue(locations.length == 1 && locations[0].getHosts().length == 3);
+      //verify the files got created in the right nodes
+      for (BlockLocation loc : locations) {
+        String[] hosts = loc.getNames();
+        String[] hosts1 = getStringForInetSocketAddrs(datanode);
+        assertTrue(compareNodes(hosts, hosts1));
+      }
+    }
+  }
+
+  @Test
+  public void testWhenFavoredNodesNotPresent() throws Exception {
+    //when we ask for favored nodes but the nodes are not there, we should
+    //get some other nodes. In other words, the write to hdfs should not fail
+    //and if we do getBlockLocations on the file, we should see one blklocation
+    //and three hosts for that
+    Random rand = new Random(System.currentTimeMillis());
+    InetSocketAddress arbitraryAddrs[] = new InetSocketAddress[3];
+    for (int i = 0; i < 3; i++) {
+      arbitraryAddrs[i] = getArbitraryLocalHostAddr();
+    }
+    Path p = new Path("/filename-foo-bar");
+    FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true,
+        4096, (short)3, (long)4096, null, arbitraryAddrs);
+    out.write(SOME_BYTES);
+    out.close();
+    BlockLocation[] locations = 
+        dfs.getClient().getBlockLocations(p.toUri().getPath(), 0, 
+            Long.MAX_VALUE);
+    assertTrue(locations.length == 1 && locations[0].getHosts().length == 3);
+  }
+
+  @Test
+  public void testWhenSomeNodesAreNotGood() throws Exception {
+    //make some datanode not "good" so that even if the client prefers it,
+    //the namenode would not give it as a replica to write to
+    DatanodeInfo d = cluster.getNameNode().getNamesystem().getBlockManager()
+           .getDatanodeManager().getDatanodeByXferAddr(
+               datanodes.get(0).getXferAddress().getAddress().getHostAddress(), 
+               datanodes.get(0).getXferAddress().getPort());
+    //set the decommission status to true so that 
+    //BlockPlacementPolicyDefault.isGoodTarget returns false for this dn
+    d.setDecommissioned();
+    InetSocketAddress addrs[] = new InetSocketAddress[3];
+    for (int i = 0; i < 3; i++) {
+      addrs[i] = datanodes.get(i).getXferAddress();
+    }
+    Path p = new Path("/filename-foo-bar-baz");
+    FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true,
+        4096, (short)3, (long)4096, null, addrs);
+    out.write(SOME_BYTES);
+    out.close();
+    BlockLocation[] locations = 
+        dfs.getClient().getBlockLocations(p.toUri().getPath(), 0, 
+            Long.MAX_VALUE);
+    //reset the state
+    d.stopDecommission();
+    assertTrue(locations.length == 1 && locations[0].getHosts().length == 3);
+    //also make sure that the datanode[0] is not in the list of hosts
+    String datanode0 = 
+        datanodes.get(0).getXferAddress().getAddress().getHostAddress()
+        + ":" + datanodes.get(0).getXferAddress().getPort();
+    for (int i = 0; i < 3; i++) {
+      if (locations[0].getNames()[i].equals(datanode0)) {
+        fail(datanode0 + " not supposed to be a replica for the block");
+      }
+    }
+  }
+
+  private String[] getStringForInetSocketAddrs(InetSocketAddress[] datanode) {
+    String strs[] = new String[datanode.length];
+    for (int i = 0; i < datanode.length; i++) {
+      strs[i] = datanode[i].getAddress().getHostAddress() + ":" + 
+       datanode[i].getPort();
+    }
+    return strs;
+  }
+
+  private boolean compareNodes(String[] dnList1, String[] dnList2) {
+    for (int i = 0; i < dnList1.length; i++) {
+      boolean matched = false;
+      for (int j = 0; j < dnList2.length; j++) {
+        if (dnList1[i].equals(dnList2[j])) {
+          matched = true;
+          break;
+        }
+      }
+      if (matched == false) {
+        fail(dnList1[i] + " not a favored node");
+      }
+    }
+    return true;
+  }
+
+  private InetSocketAddress[] getDatanodes(Random rand) {
+    //Get some unique random indexes
+    int idx1 = rand.nextInt(NUM_DATA_NODES);
+    int idx2;
+    
+    do {
+      idx2 = rand.nextInt(NUM_DATA_NODES);
+    } while (idx1 == idx2);
+    
+    int idx3;
+    do {
+      idx3 = rand.nextInt(NUM_DATA_NODES);
+    } while (idx2 == idx3 || idx1 == idx3);
+    
+    InetSocketAddress[] addrs = new InetSocketAddress[3];
+    addrs[0] = datanodes.get(idx1).getXferAddress();
+    addrs[1] = datanodes.get(idx2).getXferAddress();
+    addrs[2] = datanodes.get(idx3).getXferAddress();
+    return addrs;
+  }
+
+  private InetSocketAddress getArbitraryLocalHostAddr() 
+      throws UnknownHostException{
+    Random rand = new Random(System.currentTimeMillis());
+    int port = rand.nextInt(65535);
+    while (true) {
+      boolean conflict = false;
+      for (DataNode d : datanodes) {
+        if (d.getXferAddress().getPort() == port) {
+          port = rand.nextInt(65535);
+          conflict = true;
+        }
+      }
+      if (conflict == false) {
+        break;
+      }
+    }
+    return new InetSocketAddress(InetAddress.getLocalHost(), port);
+  }
+}

+ 11 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java

@@ -521,9 +521,17 @@ public class TestPipelinesFailover {
         storedBlock instanceof BlockInfoUnderConstruction);
     BlockInfoUnderConstruction ucBlock =
       (BlockInfoUnderConstruction)storedBlock;
-    // We expect that the first indexed replica will be the one
-    // to be in charge of the synchronization / recovery protocol.
-    DatanodeDescriptor expectedPrimary = ucBlock.getExpectedLocations()[0];
+    // We expect that the replica with the most recent heart beat will be
+    // the one to be in charge of the synchronization / recovery protocol.
+    DatanodeDescriptor[] datanodes = ucBlock.getExpectedLocations();
+    DatanodeDescriptor expectedPrimary = datanodes[0];
+    long mostRecentLastUpdate = expectedPrimary.getLastUpdate();
+    for (int i = 1; i < datanodes.length; i++) {
+      if (datanodes[i].getLastUpdate() > mostRecentLastUpdate) {
+        expectedPrimary = datanodes[i];
+        mostRecentLastUpdate = expectedPrimary.getLastUpdate();
+      }
+    }
     return expectedPrimary;
   }
 

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -356,6 +356,9 @@ Release 2.0.5-beta - UNRELEASED
     MAPREDUCE-5178. Update MR App to set progress in ApplicationReport after
     YARN-577. (Hitesh Shah via vinodkv)
 
+    MAPREDUCE-5167. Update MR App after YARN-562 to use the new builder API
+    for the container. (Jian He via vinodkv)
+
 Release 2.0.4-alpha - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 4 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -1094,12 +1094,12 @@ public abstract class TaskAttemptImpl implements
         + taInfo.getPort());
     String nodeHttpAddress = StringInterner.weakIntern(taInfo.getHostname() + ":"
         + taInfo.getHttpPort());
-    // Resource/Priority/Tokens are only needed while launching the
-    // container on an NM, these are already completed tasks, so setting them to
-    // null
+    // Resource/Priority/Tokens and RMIdentifier are only needed while
+    // launching the container on an NM, these are already completed tasks, so
+    // setting them to null and RMIdentifier as 0
     container =
         BuilderUtils.newContainer(containerId, containerNodeId,
-          nodeHttpAddress, null, null, null);
+          nodeHttpAddress, null, null, null, 0);
     computeRackAndLocality();
     launchTime = taInfo.getStartTime();
     finishTime = (taInfo.getFinishTime() != -1) ?

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java

@@ -519,7 +519,7 @@ public class MRApp extends MRAppMaster {
         cId.setId(containerCount++);
         NodeId nodeId = BuilderUtils.newNodeId(NM_HOST, NM_PORT);
         Container container = BuilderUtils.newContainer(cId, nodeId,
-            NM_HOST + ":" + NM_HTTP_PORT, null, null, null);
+            NM_HOST + ":" + NM_HTTP_PORT, null, null, null, 0);
         JobID id = TypeConverter.fromYarn(applicationId);
         JobId jobId = TypeConverter.toYarn(id);
         getContext().getEventHandler().handle(new JobHistoryEvent(jobId, 

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java

@@ -243,7 +243,7 @@ public class MRAppBenchmark {
                       .newContainer(containerId, BuilderUtils.newNodeId("host"
                           + containerId.getId(), 2345),
                         "host" + containerId.getId() + ":5678", req
-                          .getCapability(), req.getPriority(), null));
+                          .getCapability(), req.getPriority(), null, 0));
                   }
                 }
 

+ 9 - 0
hadoop-yarn-project/CHANGES.txt

@@ -187,6 +187,12 @@ Release 2.0.5-beta - UNRELEASED
     YARN-595. Refactor fair scheduler to use common Resources. (Sandy Ryza
     via tomwhite)
 
+    YARN-562. Modified NM to reject any containers allocated by a previous
+    ResourceManager. (Jian He via vinodkv)
+
+    YARN-591. Moved RM recovery related records out of public API as they do not
+    belong there. (vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -596,6 +602,9 @@ Release 0.23.8 - UNRELEASED
 
   BUG FIXES
 
+    YARN-363. Add webapps/proxy directory without which YARN proxy-server fails
+    when started in stand-alone mode. (Kenji Kikushima via vinodkv)
+
 Release 0.23.7 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java

@@ -135,4 +135,16 @@ public interface Container extends Comparable<Container> {
   @Private
   @Unstable
   void setContainerToken(ContainerToken containerToken);
+
+  /**
+   * Get the RMIdentifier of RM in which containers are allocated
+   * @return RMIdentifier
+   */
+  @Private
+  @Unstable
+  long getRMIdentifer();
+
+  @Private
+  @Unstable
+  void setRMIdentifier(long rmIdentifier);
 }

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java

@@ -230,6 +230,18 @@ public class ContainerPBImpl extends ProtoBase<ContainerProto> implements Contai
     this.containerToken = containerToken;
   }
 
+  @Override
+  public long getRMIdentifer() {
+    ContainerProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getRmIdentifier();
+  }
+
+  @Override
+  public void setRMIdentifier(long rmIdentifier) {
+    maybeInitBuilder();
+    builder.setRmIdentifier((rmIdentifier));
+  }
+
   private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
     return new ContainerIdPBImpl(p);
   }

+ 1 - 13
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto

@@ -68,6 +68,7 @@ message ContainerProto {
   optional ResourceProto resource = 4;
   optional PriorityProto priority = 5;
   optional hadoop.common.TokenProto container_token = 6;
+  optional int64 rm_identifier = 7;
 }
 
 enum YarnApplicationStateProto {
@@ -311,16 +312,3 @@ message StringBytesMapProto {
   optional string key = 1;
   optional bytes value = 2;
 }
-
-////////////////////////////////////////////////////////////////////////
-////// From recovery////////////////////////////////////////////////////
-////////////////////////////////////////////////////////////////////////
-message ApplicationStateDataProto {
-  optional int64 submit_time = 1;
-  optional ApplicationSubmissionContextProto application_submission_context = 2;
-}
-
-message ApplicationAttemptStateDataProto {
-  optional ApplicationAttemptIdProto attemptId = 1;
-  optional ContainerProto master_container = 2;
-}

+ 14 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_server_resourcemanager_service_protos.proto

@@ -21,6 +21,7 @@ option java_outer_classname = "YarnServerResourceManagerServiceProtos";
 option java_generic_services = true;
 option java_generate_equals_and_hash = true;
 
+import "yarn_protos.proto";
 
 message RefreshQueuesRequestProto {
 }
@@ -59,3 +60,16 @@ message GetGroupsForUserRequestProto {
 message GetGroupsForUserResponseProto {
   repeated string groups = 1;
 }
+
+////////////////////////////////////////////////////////////////////////
+////// RM recovery related records /////////////////////////////////////
+////////////////////////////////////////////////////////////////////////
+message ApplicationStateDataProto {
+  optional int64 submit_time = 1;
+  optional ApplicationSubmissionContextProto application_submission_context = 2;
+}
+
+message ApplicationAttemptStateDataProto {
+  optional ApplicationAttemptIdProto attemptId = 1;
+  optional ContainerProto master_container = 2;
+}

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java

@@ -56,7 +56,7 @@ public class TestAMRMClientAsync {
             BuilderUtils.newContainerId(0, 0, 0, 0),
             ContainerState.COMPLETE, "", 0));
     List<Container> allocated1 = Arrays.asList(
-        BuilderUtils.newContainer(null, null, null, null, null, null));
+        BuilderUtils.newContainer(null, null, null, null, null, null, 0));
     final AllocateResponse response1 = createAllocateResponse(
         new ArrayList<ContainerStatus>(), allocated1);
     final AllocateResponse response2 = createAllocateResponse(completed1,

+ 4 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java

@@ -237,9 +237,9 @@ public class BuilderUtils {
     return containerStatus;
   }
 
-  public static Container newContainer(ContainerId containerId,
-      NodeId nodeId, String nodeHttpAddress,
-      Resource resource, Priority priority, ContainerToken containerToken) {
+  public static Container newContainer(ContainerId containerId, NodeId nodeId,
+      String nodeHttpAddress, Resource resource, Priority priority,
+      ContainerToken containerToken, long rmIdentifier) {
     Container container = recordFactory.newRecordInstance(Container.class);
     container.setId(containerId);
     container.setNodeId(nodeId);
@@ -247,6 +247,7 @@ public class BuilderUtils {
     container.setResource(resource);
     container.setPriority(priority);
     container.setContainerToken(containerToken);
+    container.setRMIdentifier(rmIdentifier);
     return container;
   }
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java

@@ -105,7 +105,7 @@ public class TestContainerLaunchRPC {
       containerId.setId(100);
       Container container =
           BuilderUtils.newContainer(containerId, null, null, recordFactory
-              .newRecordInstance(Resource.class), null, null);
+              .newRecordInstance(Resource.class), null, null, 0);
 
       StartContainerRequest scRequest = recordFactory
           .newRecordInstance(StartContainerRequest.class);

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java

@@ -128,7 +128,7 @@ public class TestRPC {
     containerId.setId(100);
     Container mockContainer =
         BuilderUtils.newContainer(containerId, null, null, recordFactory
-            .newRecordInstance(Resource.class), null, null);
+            .newRecordInstance(Resource.class), null, null, 0);
 //    containerLaunchContext.env = new HashMap<CharSequence, CharSequence>();
 //    containerLaunchContext.command = new ArrayList<CharSequence>();
     

+ 24 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerConstants.java

@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api;
+
+public interface ResourceManagerConstants {
+
+  public static final long RM_INVALID_IDENTIFIER = 0;
+}

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java

@@ -30,4 +30,7 @@ public interface RegisterNodeManagerResponse {
 
   void setNodeAction(NodeAction nodeAction);
 
+  long getRMIdentifier();
+
+  void setRMIdentifier(long rmIdentifier);
 }

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java

@@ -121,6 +121,18 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
     rebuild = true;
   }
 
+  @Override
+  public long getRMIdentifier() {
+    RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.getRmIdentifier());
+  }
+
+  @Override
+  public void setRMIdentifier(long rmIdentifier) {
+    maybeInitBuilder();
+    builder.setRmIdentifier(rmIdentifier);
+  }
+
   private NodeAction convertFromProtoFormat(NodeActionProto p) {
     return  NodeAction.valueOf(p.name());
   }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto

@@ -33,6 +33,7 @@ message RegisterNodeManagerRequestProto {
 message RegisterNodeManagerResponseProto {
   optional MasterKeyProto master_key = 1;
   optional NodeActionProto nodeAction = 2;
+  optional int64 rm_identifier = 3;
 }
 
 message NodeHeartbeatRequestProto {

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager;
 
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.hadoop.yarn.api.ContainerManager;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
@@ -48,4 +49,6 @@ public interface Context {
   NMContainerTokenSecretManager getContainerTokenSecretManager();
 
   NodeHealthStatus getNodeHealthStatus();
+
+  ContainerManager getContainerManager();
 }

+ 17 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ContainerManager;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
@@ -164,6 +165,7 @@ public class NodeManager extends CompositeService
     addService(nodeHealthChecker);
     dirsHandler = nodeHealthChecker.getDiskHandler();
 
+
     nodeStatusUpdater =
         createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
 
@@ -174,6 +176,7 @@ public class NodeManager extends CompositeService
         createContainerManager(context, exec, del, nodeStatusUpdater,
         this.aclsManager, dirsHandler);
     addService(containerManager);
+    ((NMContext) context).setContainerManager(containerManager);
 
     Service webServer = createWebServer(context, containerManager
         .getContainersMonitor(), this.aclsManager, dirsHandler);
@@ -221,11 +224,13 @@ public class NodeManager extends CompositeService
     DefaultMetricsSystem.shutdown();
   }
 
-  protected void cleanupContainersOnResync() {
+  protected void resyncWithRM() {
     //we do not want to block dispatcher thread here
     new Thread() {
       @Override
       public void run() {
+        LOG.info("Notifying ContainerManager to block new container-requests");
+        containerManager.setBlockNewContainerRequests(true);
         cleanupContainers(NodeManagerEventType.RESYNC);
         ((NodeStatusUpdaterImpl) nodeStatusUpdater ).rebootNodeStatusUpdater();
       }
@@ -296,7 +301,7 @@ public class NodeManager extends CompositeService
         new ConcurrentSkipListMap<ContainerId, Container>();
 
     private final NMContainerTokenSecretManager containerTokenSecretManager;
-
+    private ContainerManager containerManager;
     private final NodeHealthStatus nodeHealthStatus = RecordFactoryProvider
         .getRecordFactory(null).newRecordInstance(NodeHealthStatus.class);
 
@@ -333,6 +338,15 @@ public class NodeManager extends CompositeService
     public NodeHealthStatus getNodeHealthStatus() {
       return this.nodeHealthStatus;
     }
+
+    @Override
+    public ContainerManager getContainerManager() {
+      return this.containerManager;
+    }
+
+    public void setContainerManager(ContainerManager containerManager) {
+      this.containerManager = containerManager;
+    }
   }
 
 
@@ -376,7 +390,7 @@ public class NodeManager extends CompositeService
       stop();
       break;
     case RESYNC:
-      cleanupContainersOnResync();
+      resyncWithRM();
       break;
     default:
       LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java

@@ -24,5 +24,8 @@ import org.apache.hadoop.yarn.service.Service;
 public interface NodeStatusUpdater extends Service {
 
   void sendOutofBandHeartBeat();
+
   NodeStatus getNodeStatusAndUpdateContainersInContext();
+
+  long getRMIdentifier();
 }

+ 16 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.service.AbstractService;
@@ -95,6 +97,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
 
   private Runnable statusUpdaterRunnable;
   private Thread  statusUpdater;
+  private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER;
 
   public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
       NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
@@ -267,6 +270,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
         this.resourceTracker = getRMClient();
         regNMResponse =
             this.resourceTracker.registerNodeManager(request);
+        this.rmIdentifier = regNMResponse.getRMIdentifier();
         break;
       } catch(Throwable e) {
         LOG.warn("Trying to connect to ResourceManager, " +
@@ -308,7 +312,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
 
     LOG.info("Registered with ResourceManager as " + this.nodeId
         + " with total resource of " + this.totalResource);
-
+    LOG.info("Notifying ContainerManager to unblock new container-requests");
+    ((ContainerManagerImpl) this.context.getContainerManager())
+      .setBlockNewContainerRequests(false);
   }
 
   private List<ApplicationId> createKeepAliveApplicationList() {
@@ -334,6 +340,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     return appList;
   }
 
+  @Override
   public NodeStatus getNodeStatusAndUpdateContainersInContext() {
 
     NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
@@ -407,6 +414,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     }
   }
 
+  @Override
+  public long getRMIdentifier() {
+    return this.rmIdentifier;
+  }
+
   protected void startStatusUpdater() {
 
     statusUpdaterRunnable = new Runnable() {
@@ -478,6 +490,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
             if (response.getNodeAction() == NodeAction.RESYNC) {
               LOG.info("Node is out of sync with ResourceManager,"
                   + " hence rebooting.");
+              // Invalidate the RMIdentifier while resync
+              NodeStatusUpdaterImpl.this.rmIdentifier =
+                  ResourceManagerConstants.RM_INVALID_IDENTIFIER;
               dispatcher.getEventHandler().handle(
                   new NodeManagerEvent(NodeManagerEventType.RESYNC));
               break;

+ 27 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -23,10 +23,9 @@ import static org.apache.hadoop.yarn.service.Service.STATE.STARTED;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -125,6 +124,7 @@ public class ContainerManagerImpl extends CompositeService implements
   private final ApplicationACLsManager aclsManager;
 
   private final DeletionService deletionService;
+  private AtomicBoolean blockNewContainerRequests = new AtomicBoolean(false);
 
   public ContainerManagerImpl(Context context, ContainerExecutor exec,
       DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
@@ -239,7 +239,10 @@ public class ContainerManagerImpl extends CompositeService implements
         false)) {
       refreshServiceAcls(conf, new NMPolicyProvider());
     }
-    
+
+    LOG.info("Blocking new container-requests as container manager rpc" +
+    		" server is still starting.");
+    this.setBlockNewContainerRequests(true);
     server.start();
     InetSocketAddress connectAddress = NetUtils.getConnectAddress(server);
     this.context.getNodeId().setHost(connectAddress.getHostName());
@@ -393,6 +396,13 @@ public class ContainerManagerImpl extends CompositeService implements
   @Override
   public StartContainerResponse startContainer(StartContainerRequest request)
       throws YarnRemoteException {
+
+    if (blockNewContainerRequests.get()) {
+      throw RPCUtil.getRemoteException(new NMNotYetReadyException(
+          "Rejecting new containers as NodeManager has not" +
+          " yet connected with ResourceManager"));
+    }
+
     ContainerLaunchContext launchContext = request.getContainerLaunchContext();
     org.apache.hadoop.yarn.api.records.Container lauchContainer =
         request.getContainer();
@@ -402,6 +412,16 @@ public class ContainerManagerImpl extends CompositeService implements
     UserGroupInformation remoteUgi = getRemoteUgi(containerIDStr);
     authorizeRequest(containerIDStr, launchContext, lauchContainer, remoteUgi);
 
+    // Is the container coming from unknown RM
+    if (lauchContainer.getRMIdentifer() != nodeStatusUpdater
+      .getRMIdentifier()) {
+      String msg = "\nContainer "+ containerIDStr
+          + " rejected as it is allocated by a previous RM";
+      LOG.error(msg);
+      throw RPCUtil
+        .getRemoteException(new InvalidContainerException(msg));
+    }
+
     LOG.info("Start request for " + containerIDStr + " by user "
         + launchContext.getUser());
 
@@ -615,6 +635,10 @@ public class ContainerManagerImpl extends CompositeService implements
     }
   }
 
+  public void setBlockNewContainerRequests(boolean blockNewContainerRequests) {
+    this.blockNewContainerRequests.set(blockNewContainerRequests);
+  }
+
   @Override
   public void stateChanged(Service service) {
     // TODO Auto-generated method stub

+ 33 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/InvalidContainerException.java

@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import org.apache.hadoop.yarn.YarnException;
+
+/**
+ * This Exception happens when NM is rejecting container requests from RM
+ */
+public class InvalidContainerException extends YarnException {
+
+  private static final long serialVersionUID = 1L;
+
+  public InvalidContainerException(String msg) {
+    super(msg);
+  }
+}

+ 34 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/NMNotYetReadyException.java

@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import org.apache.hadoop.yarn.YarnException;
+
+/**
+ * This exception happens when NM starts from scratch but has not yet connected
+ * with RM.
+ */
+public class NMNotYetReadyException extends YarnException {
+
+  private static final long serialVersionUID = 1L;
+
+  public NMNotYetReadyException(String msg) {
+    super(msg);
+  }
+}

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java

@@ -168,4 +168,9 @@ public class DummyContainerManager extends ContainerManagerImpl {
       }
     };
   }
+
+  @Override
+  public void setBlockNewContainerRequests(boolean blockNewContainerRequests) {
+    // do nothing
+  }
 }

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java

@@ -142,6 +142,17 @@ public class TestContainerManagerWithLCE extends TestContainerManager {
     super.testLocalFilesCleanup();
   }
 
+  @Override
+  public void testContainerLaunchFromPreviousRM() throws InterruptedException,
+      IOException {
+    // Don't run the test if the binary is not available.
+    if (!shouldRunTest()) {
+      LOG.info("LCE binary path is not passed. Not running the test");
+      return;
+    }
+    LOG.info("Running testContainerLaunchFromPreviousRM");
+    super.testContainerLaunchFromPreviousRM();
+  }
   private boolean shouldRunTest() {
     return System
         .getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;

+ 315 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java

@@ -0,0 +1,315 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.NMNotYetReadyException;
+import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNodeManagerResync {
+  static final File basedir =
+      new File("target", TestNodeManagerResync.class.getName());
+  static final File tmpDir = new File(basedir, "tmpDir");
+  static final File logsDir = new File(basedir, "logs");
+  static final File remoteLogsDir = new File(basedir, "remotelogs");
+  static final File nmLocalDir = new File(basedir, "nm0");
+  static final File processStartFile = new File(tmpDir, "start_file.txt")
+    .getAbsoluteFile();
+
+  static final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
+  static final String user = "nobody";
+  private FileContext localFS;
+  private CyclicBarrier syncBarrier;
+  private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
+
+  @Before
+  public void setup() throws UnsupportedFileSystemException {
+    localFS = FileContext.getLocalFSFileContext();
+    tmpDir.mkdirs();
+    logsDir.mkdirs();
+    remoteLogsDir.mkdirs();
+    nmLocalDir.mkdirs();
+    syncBarrier = new CyclicBarrier(2);
+  }
+
+  @After
+  public void tearDown() throws IOException, InterruptedException {
+    localFS.delete(new Path(basedir.getPath()), true);
+    assertionFailedInThread.set(false);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testKillContainersOnResync() throws IOException,
+      InterruptedException {
+    NodeManager nm = new TestNodeManager1();
+    YarnConfiguration conf = createNMConfig();
+    nm.init(conf);
+    nm.start();
+    ContainerId cId = TestNodeManagerShutdown.createContainerId();
+    TestNodeManagerShutdown.startContainer(nm, cId, localFS, tmpDir,
+      processStartFile);
+
+    Assert.assertEquals(1, ((TestNodeManager1) nm).getNMRegistrationCount());
+    nm.getNMDispatcher().getEventHandler().
+        handle( new NodeManagerEvent(NodeManagerEventType.RESYNC));
+    try {
+      syncBarrier.await();
+    } catch (BrokenBarrierException e) {
+    }
+    Assert.assertEquals(2, ((TestNodeManager1) nm).getNMRegistrationCount());
+
+    Assert.assertFalse(assertionFailedInThread.get());
+
+    nm.stop();
+  }
+
+  // This test tests new container requests are blocked when NM starts from
+  // scratch until it register with RM AND while NM is resyncing with RM
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testBlockNewContainerRequestsOnStartAndResync()
+      throws IOException, InterruptedException {
+    NodeManager nm = new TestNodeManager2();
+    YarnConfiguration conf = createNMConfig();
+    nm.init(conf);
+    nm.start();
+
+    // Start the container in running state
+    ContainerId cId = TestNodeManagerShutdown.createContainerId();
+    TestNodeManagerShutdown.startContainer(nm, cId, localFS, tmpDir,
+      processStartFile);
+
+    nm.getNMDispatcher().getEventHandler()
+      .handle(new NodeManagerEvent(NodeManagerEventType.RESYNC));
+    try {
+      syncBarrier.await();
+    } catch (BrokenBarrierException e) {
+    }
+    Assert.assertFalse(assertionFailedInThread.get());
+    nm.stop();
+  }
+
+  private YarnConfiguration createNMConfig() {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB
+    conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345");
+    conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346");
+    conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
+    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+      remoteLogsDir.getAbsolutePath());
+    conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
+    return conf;
+  }
+
+  class TestNodeManager1 extends NodeManager {
+
+    private int registrationCount = 0;
+
+    @Override
+    protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+        Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+      return new TestNodeStatusUpdaterImpl1(context, dispatcher,
+          healthChecker, metrics);
+    }
+
+    public int getNMRegistrationCount() {
+      return registrationCount;
+    }
+
+    class TestNodeStatusUpdaterImpl1 extends MockNodeStatusUpdater {
+
+      public TestNodeStatusUpdaterImpl1(Context context, Dispatcher dispatcher,
+          NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+        super(context, dispatcher, healthChecker, metrics);
+      }
+
+      @Override
+      protected void registerWithRM() throws YarnRemoteException {
+        super.registerWithRM();
+        registrationCount++;
+      }
+
+      @Override
+      protected void rebootNodeStatusUpdater() {
+        ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager
+        .containermanager.container.Container> containers =
+            getNMContext().getContainers();
+        try {
+          // ensure that containers are empty before restart nodeStatusUpdater
+          Assert.assertTrue(containers.isEmpty());
+          super.rebootNodeStatusUpdater();
+          syncBarrier.await();
+        } catch (InterruptedException e) {
+        } catch (BrokenBarrierException e) {
+        } catch (AssertionError ae) {
+          assertionFailedInThread.set(true);
+        }
+      }
+    }
+  }
+
+  class TestNodeManager2 extends NodeManager {
+
+    Thread launchContainersThread = null;
+    @Override
+    protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+        Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+      return new TestNodeStatusUpdaterImpl2(context, dispatcher,
+        healthChecker, metrics);
+    }
+
+    @Override
+    protected ContainerManagerImpl createContainerManager(Context context,
+        ContainerExecutor exec, DeletionService del,
+        NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,
+        LocalDirsHandlerService dirsHandler) {
+      return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
+        metrics, aclsManager, dirsHandler){
+        @Override
+        public void setBlockNewContainerRequests(
+            boolean blockNewContainerRequests) {
+          if (blockNewContainerRequests) {
+            // start test thread right after blockNewContainerRequests is set
+            // true
+            super.setBlockNewContainerRequests(blockNewContainerRequests);
+            launchContainersThread = new RejectedContainersLauncherThread();
+            launchContainersThread.start();
+          } else {
+            // join the test thread right before blockNewContainerRequests is
+            // reset
+            try {
+              // stop the test thread
+              ((RejectedContainersLauncherThread) launchContainersThread)
+                .setStopThreadFlag(true);
+              launchContainersThread.join();
+              ((RejectedContainersLauncherThread) launchContainersThread)
+              .setStopThreadFlag(false);
+              super.setBlockNewContainerRequests(blockNewContainerRequests);
+            } catch (InterruptedException e) {
+            }
+          }
+        }
+      };
+    }
+
+    class TestNodeStatusUpdaterImpl2 extends MockNodeStatusUpdater {
+
+      public TestNodeStatusUpdaterImpl2(Context context, Dispatcher dispatcher,
+          NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+        super(context, dispatcher, healthChecker, metrics);
+      }
+
+      @Override
+      protected void rebootNodeStatusUpdater() {
+        ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager
+        .containermanager.container.Container> containers =
+            getNMContext().getContainers();
+
+        try {
+          // ensure that containers are empty before restart nodeStatusUpdater
+          Assert.assertTrue(containers.isEmpty());
+          super.rebootNodeStatusUpdater();
+          // After this point new containers are free to be launched, except
+          // containers from previous RM
+          // Wait here so as to sync with the main test thread.
+          syncBarrier.await();
+        } catch (InterruptedException e) {
+        } catch (BrokenBarrierException e) {
+        } catch (AssertionError ae) {
+          assertionFailedInThread.set(true);
+        }
+      }
+    }
+
+    class RejectedContainersLauncherThread extends Thread {
+
+      boolean isStopped = false;
+      public void setStopThreadFlag(boolean isStopped) {
+        this.isStopped = isStopped;
+      }
+
+      @Override
+      public void run() {
+        int numContainers = 0;
+        int numContainersRejected = 0;
+        ContainerLaunchContext containerLaunchContext =
+            recordFactory.newRecordInstance(ContainerLaunchContext.class);
+        try {
+          while (!isStopped && numContainers < 10) {
+            ContainerId cId = TestNodeManagerShutdown.createContainerId();
+            Container container =
+                BuilderUtils.newContainer(cId, null, null, null, null, null, 0);
+            StartContainerRequest startRequest =
+                recordFactory.newRecordInstance(StartContainerRequest.class);
+            startRequest.setContainerLaunchContext(containerLaunchContext);
+            startRequest.setContainer(container);
+            System.out.println("no. of containers to be launched: "
+                + numContainers);
+            numContainers++;
+            try {
+              getContainerManager().startContainer(startRequest);
+            } catch (YarnRemoteException e) {
+              numContainersRejected++;
+              Assert.assertTrue(e.getMessage().contains(
+                "Rejecting new containers as NodeManager has not" +
+                " yet connected with ResourceManager"));
+              // TO DO: This should be replaced to explicitly check exception
+              // class name after YARN-142
+              Assert.assertTrue(e.getRemoteTrace().contains(
+                NMNotYetReadyException.class.getName()));
+            }
+          }
+          // no. of containers to be launched should equal to no. of
+          // containers rejected
+          Assert.assertEquals(numContainers, numContainersRejected);
+        } catch (AssertionError ae) {
+          assertionFailedInThread.set(true);
+        }
+      }
+    }
+  }
+}

+ 10 - 78
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java

@@ -24,17 +24,12 @@ import static org.mockito.Mockito.when;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CyclicBarrier;
 
 import junit.framework.Assert;
 
@@ -59,12 +54,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
-import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.After;
@@ -86,7 +78,6 @@ public class TestNodeManagerShutdown {
   static final String user = "nobody";
   private FileContext localFS;
   private ContainerId cId;
-  private CyclicBarrier syncBarrier = new CyclicBarrier(2);
 
   @Before
   public void setup() throws UnsupportedFileSystemException {
@@ -110,7 +101,7 @@ public class TestNodeManagerShutdown {
     NodeManager nm = getNodeManager();
     nm.init(createNMConfig());
     nm.start();
-    startContainers(nm);
+    startContainer(nm, cId, localFS, tmpDir, processStartFile);
     
     final int MAX_TRIES=20;
     int numTries = 0;
@@ -150,29 +141,13 @@ public class TestNodeManagerShutdown {
       reader.close();
     }
   }
-  
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testKillContainersOnResync() throws IOException, InterruptedException {
-    NodeManager nm = new TestNodeManager();
-    YarnConfiguration conf = createNMConfig();
-    nm.init(conf);
-    nm.start();
-    startContainers(nm);
-
-    assert ((TestNodeManager) nm).getNMRegistrationCount() == 1;
-    nm.getNMDispatcher().getEventHandler().
-        handle( new NodeManagerEvent(NodeManagerEventType.RESYNC));
-    try {
-      syncBarrier.await();
-    } catch (BrokenBarrierException e) {
-    }
-    assert ((TestNodeManager) nm).getNMRegistrationCount() == 2;
-  }
 
-  private void startContainers(NodeManager nm) throws IOException {
+  public static void startContainer(NodeManager nm, ContainerId cId,
+      FileContext localFS, File scriptFileDir, File processStartFile)
+      throws IOException {
     ContainerManagerImpl containerManager = nm.getContainerManager();
-    File scriptFile = createUnhaltingScriptFile();
+    File scriptFile =
+        createUnhaltingScriptFile(cId, scriptFileDir, processStartFile);
     
     ContainerLaunchContext containerLaunchContext =
         recordFactory.newRecordInstance(ContainerLaunchContext.class);
@@ -218,7 +193,7 @@ public class TestNodeManagerShutdown {
     Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState());
   }
   
-  private ContainerId createContainerId() {
+  public static ContainerId createContainerId() {
     ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
     appId.setClusterTimestamp(0);
     appId.setId(0);
@@ -247,8 +222,9 @@ public class TestNodeManagerShutdown {
    * Creates a script to run a container that will run forever unless
    * stopped by external means.
    */
-  private File createUnhaltingScriptFile() throws IOException {
-    File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
+  private static File createUnhaltingScriptFile(ContainerId cId,
+      File scriptFileDir, File processStartFile) throws IOException {
+    File scriptFile = Shell.appendScriptExtension(scriptFileDir, "scriptFile");
     PrintWriter fileWriter = new PrintWriter(scriptFile);
     if (Shell.WINDOWS) {
       fileWriter.println("@echo \"Running testscript for delayed kill\"");
@@ -282,48 +258,4 @@ public class TestNodeManagerShutdown {
       }
     };
   }
-
-  class TestNodeManager extends NodeManager {
-
-    private int registrationCount = 0;
-
-    @Override
-    protected NodeStatusUpdater createNodeStatusUpdater(Context context,
-        Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
-      return new TestNodeStatusUpdaterImpl(context, dispatcher,
-          healthChecker, metrics);
-    }
-
-    public int getNMRegistrationCount() {
-      return registrationCount;
-    }
-
-    class TestNodeStatusUpdaterImpl extends MockNodeStatusUpdater {
-
-      public TestNodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
-          NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
-        super(context, dispatcher, healthChecker, metrics);
-      }
-
-      @Override
-      protected void registerWithRM() throws YarnRemoteException {
-        super.registerWithRM();
-        registrationCount++;
-      }
-
-      @Override
-      protected void rebootNodeStatusUpdater() {
-        ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container> containers =
-            getNMContext().getContainers();
-        // ensure that containers are empty before restart nodeStatusUpdater
-        Assert.assertTrue(containers.isEmpty());
-        super.rebootNodeStatusUpdater();
-        try {
-          syncBarrier.await();
-        } catch (InterruptedException e) {
-        } catch (BrokenBarrierException e) {
-        }
-      }
-    }
-  }
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

@@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.ContainerManager;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;

+ 7 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java

@@ -156,7 +156,13 @@ public abstract class BaseContainerManagerTest {
     dirsHandler = nodeHealthChecker.getDiskHandler();
     containerManager =
         new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
-          metrics, new ApplicationACLsManager(conf), dirsHandler);
+          metrics, new ApplicationACLsManager(conf), dirsHandler) {
+          @Override
+          public void setBlockNewContainerRequests(
+              boolean blockNewContainerRequests) {
+            // do nothing
+          }
+        };
     containerManager.init(conf);
   }
 

+ 88 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java

@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
@@ -49,13 +52,18 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
 import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalRMInterface;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
@@ -63,7 +71,6 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.Test;
-import static org.mockito.Mockito.*;
 
 public class TestContainerManager extends BaseContainerManagerTest {
 
@@ -411,7 +418,13 @@ public class TestContainerManager extends BaseContainerManagerTest {
 
     containerManager =
         new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
-          metrics, new ApplicationACLsManager(conf), dirsHandler);
+          metrics, new ApplicationACLsManager(conf), dirsHandler) {
+          @Override
+          public void setBlockNewContainerRequests(
+              boolean blockNewContainerRequests) {
+            // do nothing
+          }
+        };
     containerManager.init(conf);
     containerManager.start();
 
@@ -524,4 +537,77 @@ public class TestContainerManager extends BaseContainerManagerTest {
     Assert.assertFalse(targetFile.getAbsolutePath() + " exists!!",
         targetFile.exists());
   }
+
+  @Test
+  public void testContainerLaunchFromPreviousRM() throws IOException,
+      InterruptedException {
+    // There is no real RM registration, simulate and set RMIdentifier
+    NodeStatusUpdater nodeStatusUpdater = mock(NodeStatusUpdater.class);
+    when(nodeStatusUpdater.getRMIdentifier()).thenReturn((long) 1234);
+    containerManager =
+        new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
+          metrics, new ApplicationACLsManager(conf), dirsHandler) {
+          @Override
+          public void setBlockNewContainerRequests(
+              boolean blockNewContainerRequests) {
+            // do nothing
+          }
+        };
+    containerManager.init(conf);
+    containerManager.start();
+
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+    ContainerId cId1 = createContainerId();
+    ContainerId cId2 = createContainerId();
+    containerLaunchContext.setUser(user);
+    containerLaunchContext
+      .setLocalResources(new HashMap<String, LocalResource>());
+    containerLaunchContext.setUser(containerLaunchContext.getUser());
+    Resource mockResource = mock(Resource.class);
+
+    Container mockContainer1 = mock(Container.class);
+    when(mockContainer1.getId()).thenReturn(cId1);
+    // Construct the Container with Invalid RMIdentifier
+    when(mockContainer1.getRMIdentifer()).thenReturn(
+      (long) ResourceManagerConstants.RM_INVALID_IDENTIFIER);
+    StartContainerRequest startRequest1 =
+        recordFactory.newRecordInstance(StartContainerRequest.class);
+    startRequest1.setContainerLaunchContext(containerLaunchContext);
+    startRequest1.setContainer(mockContainer1);
+    boolean catchException = false;
+    try {
+      containerManager.startContainer(startRequest1);
+    } catch (YarnRemoteException e) {
+      catchException = true;
+      Assert.assertTrue(e.getMessage().contains(
+        "Container " + cId1 + " rejected as it is allocated by a previous RM"));
+      // TO DO: This should be replaced to explicitly check exception
+      // class name after YARN-142
+      Assert.assertTrue(e.getRemoteTrace().contains(
+        InvalidContainerException.class.getName()));
+    }
+
+    // Verify that startContainer fail because of invalid container request
+    Assert.assertTrue(catchException);
+
+    // Construct the Container with a RMIdentifier within current RM
+    Container mockContainer2 = mock(Container.class);
+    when(mockContainer2.getId()).thenReturn(cId2);
+    when(mockContainer2.getRMIdentifer()).thenReturn((long) 1234);
+    when(mockContainer2.getResource()).thenReturn(mockResource);
+    StartContainerRequest startRequest2 =
+        recordFactory.newRecordInstance(StartContainerRequest.class);
+    startRequest2.setContainerLaunchContext(containerLaunchContext);
+    startRequest2.setContainer(mockContainer2);
+    boolean noException = true;
+    try {
+      containerManager.startContainer(startRequest2);
+    } catch (YarnRemoteException e) {
+      noException = false;
+    }
+    // Verify that startContainer get no YarnRemoteException
+    Assert.assertTrue(noException);
+  }
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -196,6 +196,7 @@ public class ResourceTrackerService extends AbstractService implements
         + capability + ", assigned nodeId " + nodeId);
 
     response.setNodeAction(NodeAction.NORMAL);
+    response.setRMIdentifier(ResourceManager.clusterTimeStamp);
     return response;
   }
 

+ 4 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java

@@ -33,11 +33,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptStateDataProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 import com.google.common.annotations.VisibleForTesting;

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java

@@ -25,8 +25,8 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 import com.google.common.annotations.VisibleForTesting;

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java

@@ -20,8 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl;
 
 @Unstable
 public class NullRMStateStore extends RMStateStore {

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java

@@ -30,12 +30,12 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptStateDataPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationAttemptStateData.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateData.java

@@ -16,10 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.yarn.api.records;
+package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
 
 /*
  * Contains the state data that needs to be persisted for an ApplicationAttempt

+ 5 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationAttemptStateDataPBImpl.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationAttemptStateDataPBImpl.java

@@ -16,14 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.yarn.api.records.impl.pb;
+package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptStateData;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ProtoBase;
-import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptStateDataProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptStateDataProtoOrBuilder;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProtoOrBuilder;
 
 public class ApplicationAttemptStateDataPBImpl
 extends ProtoBase<ApplicationAttemptStateDataProto> 

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationStateData.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateData.java

@@ -16,10 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.yarn.api.records;
+package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 
 /**
  * Contains all the state data that needs to be stored persistently 

+ 4 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationStateDataPBImpl.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/records/ApplicationStateDataPBImpl.java

@@ -16,13 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.yarn.api.records.impl.pb;
+package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
 
-import org.apache.hadoop.yarn.api.records.ApplicationStateData;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ProtoBase;
-import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationStateDataProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationStateDataProtoOrBuilder;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProtoOrBuilder;
 
 public class ApplicationStateDataPBImpl 
 extends ProtoBase<ApplicationStateDataProto> 

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -1243,7 +1244,7 @@ public class LeafQueue implements CSQueue {
     // Create the container
     Container container = BuilderUtils.newContainer(containerId, nodeId,
         node.getRMNode().getHttpAddress(), capability, priority,
-        null);
+        null, ResourceManager.clusterTimeStamp);
   
     return container;
   }

+ 3 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@@ -173,7 +174,7 @@ public class AppSchedulable extends Schedulable {
     // Create the container
     Container container = BuilderUtils.newContainer(containerId, nodeId,
         node.getRMNode().getHttpAddress(), capability, priority,
-        containerToken);
+        containerToken, ResourceManager.clusterTimeStamp);
 
     return container;
   }
@@ -371,4 +372,4 @@ public class AppSchedulable extends Schedulable {
         Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
             request.getCapability(), node.getRMNode().getTotalCapability());
   }
-}
+}

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
@@ -565,7 +566,7 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
         // Create the container
         Container container = BuilderUtils.newContainer(containerId, nodeId,
             node.getRMNode().getHttpAddress(), capability, priority,
-            containerToken);
+            containerToken, ResourceManager.clusterTimeStamp);
         
         // Allocate!
         

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java

@@ -188,6 +188,7 @@ public class NodeManager implements ContainerManager {
             this.nodeId, nodeHttpAddress,
             requestContainer.getResource(),
             null, null                                 // DKDC - Doesn't matter
+            , 0
             );
 
     ContainerStatus containerStatus =

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
@@ -267,6 +268,21 @@ public class TestResourceTrackerService {
     Assert.assertEquals(NodeAction.SHUTDOWN,response.getNodeAction());
   }
 
+  @Test
+  public void testSetRMIdentifierInRegistration() throws Exception {
+
+    Configuration conf = new Configuration();
+    rm = new MockRM(conf);
+    rm.start();
+
+    MockNM nm = new MockNM("host1:1234", 5120, rm.getResourceTrackerService());
+    RegisterNodeManagerResponse response = nm.registerNode();
+
+    // Verify the RMIdentifier is correctly set in RegisterNodeManagerResponse
+    Assert.assertEquals(ResourceManager.clusterTimeStamp,
+      response.getRMIdentifier());
+  }
+
   @Test
   public void testReboot() throws Exception {
     Configuration conf = new Configuration();

+ 84 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationmasterservice/TestApplicationMasterService.java

@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.applicationmasterservice;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestApplicationMasterService {
+  private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class);
+
+  private final int GB = 1024;
+  private static YarnConfiguration conf;
+
+  @BeforeClass
+  public static void setup() {
+    conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
+      ResourceScheduler.class);
+  }
+
+  @Test(timeout = 30000)
+  public void testRMIdentifierOnContainerAllocation() throws Exception {
+    MockRM rm = new MockRM(conf);
+    rm.start();
+
+    // Register node1
+    MockNM nm1 = rm.registerNode("h1:1234", 6 * GB);
+
+    // Submit an application
+    RMApp app1 = rm.submitApp(2048);
+
+    // kick the scheduling
+    nm1.nodeHeartbeat(true);
+    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+    MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+    am1.registerAppAttempt();
+
+    am1.addRequests(new String[] { "h1" }, GB, 1, 1);
+    AllocateResponse alloc1Response = am1.schedule(); // send the request
+
+    // kick the scheduler
+    nm1.nodeHeartbeat(true);
+    while (alloc1Response.getAllocatedContainers().size() < 1) {
+      LOG.info("Waiting for containers to be created for app 1...");
+      Thread.sleep(1000);
+      alloc1Response = am1.schedule();
+    }
+
+    // assert RMIdentifer is set properly in allocated containers
+    Assert.assertEquals(rm.clusterTimeStamp, alloc1Response
+      .getAllocatedContainers().get(0).getRMIdentifer());
+    rm.stop();
+  }
+}

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java

@@ -69,7 +69,7 @@ public class TestRMContainerImpl {
     Priority priority = BuilderUtils.newPriority(5);
 
     Container container = BuilderUtils.newContainer(containerId, nodeId,
-        "host:3465", resource, priority, null);
+        "host:3465", resource, priority, null, 0);
 
     RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
         nodeId, eventHandler, expirer);
@@ -139,7 +139,7 @@ public class TestRMContainerImpl {
     Priority priority = BuilderUtils.newPriority(5);
 
     Container container = BuilderUtils.newContainer(containerId, nodeId,
-        "host:3465", resource, priority, null);
+        "host:3465", resource, priority, null, 0);
 
     RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
         nodeId, eventHandler, expirer);

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java

@@ -360,7 +360,7 @@ public class TestContainerManagerSecurity {
         Container container =
             BuilderUtils.newContainer(newTokenId.getContainerID(), null, null,
                 BuilderUtils.newResource(newTokenId.getResource().getMemory(),
-                    newTokenId.getResource().getVirtualCores()), null, null);
+                    newTokenId.getResource().getVirtualCores()), null, null, 0);
         StartContainerRequest request = Records.newRecord(StartContainerRequest.class);
         request.setContainerLaunchContext(context);
         request.setContainer(container);
@@ -547,7 +547,7 @@ public class TestContainerManagerSecurity {
         createContainerLaunchContextForTest(tokenId);
     Container container =
         BuilderUtils.newContainer(tokenId.getContainerID(), null, null,
-            BuilderUtils.newResource(2048, 1), null, null);
+            BuilderUtils.newResource(2048, 1), null, null, 0);
     request.setContainerLaunchContext(context);
     request.setContainer(container);
     try {
@@ -575,7 +575,7 @@ public class TestContainerManagerSecurity {
     Container container =
         BuilderUtils.newContainer(tokenId.getContainerID(), null, null,
             BuilderUtils.newResource(tokenId.getResource().getMemory(), tokenId
-                .getResource().getVirtualCores()), null, null);
+                .getResource().getVirtualCores()), null, null, 0);
     request.setContainerLaunchContext(context);
     request.setContainer(container);
     try {

+ 52 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestWebAppProxyServer.java

@@ -0,0 +1,52 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.webproxy;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer;
+import org.apache.hadoop.yarn.service.Service.STATE;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestWebAppProxyServer {
+  private WebAppProxyServer webAppProxy = null;
+
+  @Before
+  public void setUp() throws Exception {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(YarnConfiguration.PROXY_ADDRESS, "0.0.0.0:8888");
+    webAppProxy = new WebAppProxyServer();
+    webAppProxy.init(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    webAppProxy.stop();
+  }
+
+  @Test
+  public void testStart() {
+    assertEquals(STATE.INITED, webAppProxy.getServiceState());
+    webAppProxy.start();
+    assertEquals(STATE.STARTED, webAppProxy.getServiceState());
+  }
+}