浏览代码

Merge -r 1476394:1476395 from trunk onto branch-2. Fixes HDFS-2576.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1480135 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 12 年之前
父节点
当前提交
471b37704a
共有 20 个文件被更改,包括 539 次插入41 次删除
  1. 22 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
  2. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  3. 34 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
  4. 21 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
  5. 21 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  6. 4 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
  7. 4 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
  8. 5 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
  9. 25 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  10. 19 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
  11. 69 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
  12. 71 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
  13. 4 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  14. 6 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
  15. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
  16. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
  17. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java
  18. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
  19. 4 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java
  20. 220 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java

+ 22 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java

@@ -377,6 +377,28 @@ public class NetworkTopology {
     }
   }
     
+  /**
+   * Given a string representation of a rack, return its children
+   * @param loc a path-like string representation of a rack
+   * @return a newly allocated list with all the node's children
+   */
+  public List<Node> getDatanodesInRack(String loc) {
+    netlock.readLock().lock();
+    try {
+      loc = NodeBase.normalize(loc);
+      if (!NodeBase.ROOT.equals(loc)) {
+        loc = loc.substring(1);
+      }
+      InnerNode rack = (InnerNode) clusterMap.getLoc(loc);
+      if (rack == null) {
+        return null;
+      }
+      return new ArrayList<Node>(rack.getChildren());
+    } finally {
+      netlock.readLock().unlock();
+    }
+  }
+
   /** Remove a node
    * Update node counter and rack counter if necessary
    * @param node node to be removed; can be null

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -27,6 +27,9 @@ Release 2.0.5-beta - UNRELEASED
     HDFS-4502. JsonUtil.toFileStatus(..) should check if the fileId property
     exists.  (Brandon Li via suresh)
 
+    HDFS-2576. Enhances the DistributedFileSystem's create API so that clients
+    can specify favored datanodes for a file's blocks. (ddas)
+
   IMPROVEMENTS
 
     HDFS-4222. NN is unresponsive and loses heartbeats from DNs when 

+ 34 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -1232,7 +1232,7 @@ public class DFSClient implements java.io.Closeable {
                              ChecksumOpt checksumOpt)
       throws IOException {
     return create(src, permission, flag, true,
-        replication, blockSize, progress, buffersize, checksumOpt);
+        replication, blockSize, progress, buffersize, checksumOpt, null);
   }
 
   /**
@@ -1266,6 +1266,29 @@ public class DFSClient implements java.io.Closeable {
                              Progressable progress,
                              int buffersize,
                              ChecksumOpt checksumOpt) throws IOException {
+    return create(src, permission, flag, createParent, replication, blockSize, 
+        progress, buffersize, checksumOpt, null);
+  }
+
+  /**
+   * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long,
+   * Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is
+   * a hint to where the namenode should place the file blocks.
+   * The favored nodes hint is not persisted in HDFS. Hence it may be honored
+   * at the creation time only. HDFS could move the blocks during balancing or
+   * replication, to move the blocks from favored nodes. A value of null means
+   * no favored nodes for this create
+   */
+  public DFSOutputStream create(String src, 
+                             FsPermission permission,
+                             EnumSet<CreateFlag> flag, 
+                             boolean createParent,
+                             short replication,
+                             long blockSize,
+                             Progressable progress,
+                             int buffersize,
+                             ChecksumOpt checksumOpt,
+                             InetSocketAddress[] favoredNodes) throws IOException {
     checkOpen();
     if (permission == null) {
       permission = FsPermission.getFileDefault();
@@ -1274,9 +1297,18 @@ public class DFSClient implements java.io.Closeable {
     if(LOG.isDebugEnabled()) {
       LOG.debug(src + ": masked=" + masked);
     }
+    String[] favoredNodeStrs = null;
+    if (favoredNodes != null) {
+      favoredNodeStrs = new String[favoredNodes.length];
+      for (int i = 0; i < favoredNodes.length; i++) {
+        favoredNodeStrs[i] = 
+            favoredNodes[i].getAddress().getHostAddress() + ":" 
+                         + favoredNodes[i].getPort();
+      }
+    }
     final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
         src, masked, flag, createParent, replication, blockSize, progress,
-        buffersize, dfsClientConf.createChecksum(checksumOpt));
+        buffersize, dfsClientConf.createChecksum(checksumOpt), favoredNodeStrs);
     beginFileLease(src, result);
     return result;
   }

+ 21 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -323,6 +323,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
             return key;
           }
         });
+    private String[] favoredNodes;
     volatile boolean hasError = false;
     volatile int errorIndex = -1;
     private BlockConstructionStage stage;  // block construction stage
@@ -399,7 +400,11 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
 
       }
     }
-    
+
+    private void setFavoredNodes(String[] favoredNodes) {
+      this.favoredNodes = favoredNodes;
+    }
+
     /**
      * Initialize for data streaming
      */
@@ -1181,7 +1186,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
         while (true) {
           try {
             return dfsClient.namenode.addBlock(src, dfsClient.clientName,
-                block, excludedNodes, fileId);
+                block, excludedNodes, fileId, favoredNodes);
           } catch (RemoteException e) {
             IOException ue = 
               e.unwrapRemoteException(FileNotFoundException.class,
@@ -1321,7 +1326,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
   /** Construct a new output stream for creating a file. */
   private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
       EnumSet<CreateFlag> flag, Progressable progress,
-      DataChecksum checksum) throws IOException {
+      DataChecksum checksum, String[] favoredNodes) throws IOException {
     this(dfsClient, src, progress, stat, checksum);
     this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
 
@@ -1329,12 +1334,15 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
         checksum.getBytesPerChecksum());
 
     streamer = new DataStreamer();
+    if (favoredNodes != null && favoredNodes.length != 0) {
+      streamer.setFavoredNodes(favoredNodes);
+    }
   }
 
   static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
       FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
       short replication, long blockSize, Progressable progress, int buffersize,
-      DataChecksum checksum) throws IOException {
+      DataChecksum checksum, String[] favoredNodes) throws IOException {
     final HdfsFileStatus stat;
     try {
       stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
@@ -1351,11 +1359,19 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
                                      UnresolvedPathException.class);
     }
     final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
-        flag, progress, checksum);
+        flag, progress, checksum, favoredNodes);
     out.start();
     return out;
   }
 
+  static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
+      FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
+      short replication, long blockSize, Progressable progress, int buffersize,
+      DataChecksum checksum) throws IOException {
+    return newStreamForCreate(dfsClient, src, masked, flag, createParent, replication,
+        blockSize, progress, buffersize, checksum, null);
+  }
+
   /** Construct a new output stream for append. */
   private DFSOutputStream(DFSClient dfsClient, String src,
       Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat,

+ 21 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -265,6 +265,27 @@ public class DistributedFileSystem extends FileSystem {
             : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
         blockSize, progress, null);
   }
+
+  /**
+   * Same as  
+   * {@link #create(Path, FsPermission, boolean, int, short, long, 
+   * Progressable)} with the addition of favoredNodes that is a hint to 
+   * where the namenode should place the file blocks.
+   * The favored nodes hint is not persisted in HDFS. Hence it may be honored
+   * at the creation time only. HDFS could move the blocks during balancing or
+   * replication, to move the blocks from favored nodes. A value of null means
+   * no favored nodes for this create
+   */
+  public HdfsDataOutputStream create(Path f, FsPermission permission,
+      boolean overwrite, int bufferSize, short replication, long blockSize,
+      Progressable progress, InetSocketAddress[] favoredNodes) throws IOException {
+    statistics.incrementWriteOps(1);
+    final DFSOutputStream out = dfs.create(getPathName(f), permission,
+        overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+            : EnumSet.of(CreateFlag.CREATE),
+        true, replication, blockSize, progress, bufferSize, null, favoredNodes);
+    return new HdfsDataOutputStream(out, statistics);
+  }
   
   @Override
   public HdfsDataOutputStream create(Path f, FsPermission permission,

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -300,6 +300,8 @@ public interface ClientProtocol {
    * @param excludeNodes a list of nodes that should not be
    * allocated for the current block
    * @param fileId the id uniquely identifying a file
+   * @param favoredNodes the list of nodes where the client wants the blocks.
+   *          Nodes are identified by either host name or address.
    *
    * @return LocatedBlock allocated block information.
    *
@@ -314,7 +316,8 @@ public interface ClientProtocol {
    */
   @Idempotent
   public LocatedBlock addBlock(String src, String clientName,
-      ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId)
+      ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, 
+      String[] favoredNodes)
       throws AccessControlException, FileNotFoundException,
       NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
       IOException;

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java

@@ -355,12 +355,15 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
     
     try {
       List<DatanodeInfoProto> excl = req.getExcludeNodesList();
+      List<String> favor = req.getFavoredNodesList();
       LocatedBlock result = server.addBlock(
           req.getSrc(),
           req.getClientName(),
           req.hasPrevious() ? PBHelper.convert(req.getPrevious()) : null,
           (excl == null || excl.size() == 0) ? null : PBHelper.convert(excl
-              .toArray(new DatanodeInfoProto[excl.size()])), req.getFileId());
+              .toArray(new DatanodeInfoProto[excl.size()])), req.getFileId(),
+          (favor == null || favor.size() == 0) ? null : favor
+              .toArray(new String[favor.size()]));
       return AddBlockResponseProto.newBuilder()
           .setBlock(PBHelper.convert(result)).build();
     } catch (IOException e) {

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java

@@ -302,7 +302,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
   
   @Override
   public LocatedBlock addBlock(String src, String clientName,
-      ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId)
+      ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
+      String[] favoredNodes)
       throws AccessControlException, FileNotFoundException,
       NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
       IOException {
@@ -312,6 +313,9 @@ public class ClientNamenodeProtocolTranslatorPB implements
       req.setPrevious(PBHelper.convert(previous)); 
     if (excludeNodes != null) 
       req.addAllExcludeNodes(PBHelper.convert(excludeNodes));
+    if (favoredNodes != null) {
+      req.addAllFavoredNodes(Arrays.asList(favoredNodes));
+    }
     try {
       return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock());
     } catch (ServiceException e) {

+ 25 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -59,6 +59,7 @@ import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@@ -72,6 +73,7 @@ import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Time;
 
@@ -1333,11 +1335,12 @@ public class BlockManager {
   public DatanodeDescriptor[] chooseTarget(final String src,
       final int numOfReplicas, final DatanodeDescriptor client,
       final HashMap<Node, Node> excludedNodes,
-      final long blocksize) throws IOException {
-    // choose targets for the new block to be allocated.
+      final long blocksize, List<String> favoredNodes) throws IOException {
+    List<DatanodeDescriptor> favoredDatanodeDescriptors = 
+        getDatanodeDescriptors(favoredNodes);
     final DatanodeDescriptor targets[] = blockplacement.chooseTarget(src,
-        numOfReplicas, client, new ArrayList<DatanodeDescriptor>(), false,
-        excludedNodes, blocksize);
+        numOfReplicas, client, excludedNodes, blocksize, 
+        favoredDatanodeDescriptors);
     if (targets.length < minReplication) {
       throw new IOException("File " + src + " could only be replicated to "
           + targets.length + " nodes instead of minReplication (="
@@ -1350,6 +1353,24 @@ public class BlockManager {
     return targets;
   }
 
+  /**
+   * Get list of datanode descriptors for given list of nodes. Nodes are
+   * hostaddress:port or just hostaddress.
+   */
+  List<DatanodeDescriptor> getDatanodeDescriptors(List<String> nodes) {
+    List<DatanodeDescriptor> datanodeDescriptors = null;
+    if (nodes != null) {
+      datanodeDescriptors = new ArrayList<DatanodeDescriptor>(nodes.size());
+      for (int i = 0; i < nodes.size(); i++) {
+        DatanodeDescriptor node = datanodeManager.getDatanodeDescriptor(nodes.get(i));
+        if (node != null) {
+          datanodeDescriptors.add(node);
+        }
+      }
+    }
+    return datanodeDescriptors;
+  }
+
   /**
    * Parse the data-nodes the block belongs to and choose one,
    * which will be the replication source.

+ 19 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java

@@ -116,6 +116,25 @@ public abstract class BlockPlacementPolicy {
     return chooseTarget(srcBC.getName(), numOfReplicas, writer,
                         chosenNodes, false, excludedNodes, blocksize);
   }
+  
+  /**
+   * Same as {@link #chooseTarget(String, int, DatanodeDescriptor, List, boolean, 
+   * HashMap, long)} with added parameter {@code favoredDatanodes}
+   * @param favoredNodes datanodes that should be favored as targets. This
+   *          is only a hint and due to cluster state, namenode may not be 
+   *          able to place the blocks on these datanodes.
+   */
+  DatanodeDescriptor[] chooseTarget(String src,
+      int numOfReplicas, DatanodeDescriptor writer,
+      HashMap<Node, Node> excludedNodes,
+      long blocksize, List<DatanodeDescriptor> favoredNodes) {
+    // This class does not provide the functionality of placing
+    // a block in favored datanodes. The implementations of this class
+    // are expected to provide this functionality
+    return chooseTarget(src, numOfReplicas, writer, 
+        new ArrayList<DatanodeDescriptor>(numOfReplicas), false, excludedNodes, 
+        blocksize);
+  }
 
   /**
    * Verify that the block is replicated on at least minRacks different racks

+ 69 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java

@@ -125,6 +125,60 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
         excludedNodes, blocksize);
   }
 
+  @Override
+  DatanodeDescriptor[] chooseTarget(String src, int numOfReplicas,
+      DatanodeDescriptor writer, HashMap<Node, Node> excludedNodes,
+      long blocksize, List<DatanodeDescriptor> favoredNodes) {
+    try {
+      if (favoredNodes == null || favoredNodes.size() == 0) {
+        // Favored nodes not specified, fall back to regular block placement.
+        return chooseTarget(src, numOfReplicas, writer,
+            new ArrayList<DatanodeDescriptor>(numOfReplicas), false, 
+            excludedNodes, blocksize);
+      }
+
+      HashMap<Node, Node> favoriteAndExcludedNodes = excludedNodes == null ?
+          new HashMap<Node, Node>() : new HashMap<Node, Node>(excludedNodes);
+
+      // Choose favored nodes
+      List<DatanodeDescriptor> results = new ArrayList<DatanodeDescriptor>();
+      boolean avoidStaleNodes = stats != null
+          && stats.isAvoidingStaleDataNodesForWrite();
+      for (int i = 0; i < Math.min(favoredNodes.size(), numOfReplicas); i++) {
+        DatanodeDescriptor favoredNode = favoredNodes.get(i);
+        // Choose a single node which is local to favoredNode.
+        // 'results' is updated within chooseLocalNode
+        DatanodeDescriptor target = chooseLocalNode(favoredNode,
+            favoriteAndExcludedNodes, blocksize, 
+            getMaxNodesPerRack(results, 
+                numOfReplicas)[1], results, avoidStaleNodes);
+        if (target == null) {
+          LOG.warn("Could not find a target for file " + src
+              + " with favored node " + favoredNode); 
+          continue;
+        }
+        favoriteAndExcludedNodes.put(target, target);
+      }
+
+      if (results.size() < numOfReplicas) {        
+        // Not enough favored nodes, choose other nodes.
+        numOfReplicas -= results.size();
+        DatanodeDescriptor[] remainingTargets = 
+            chooseTarget(src, numOfReplicas, writer, results,
+                false, favoriteAndExcludedNodes, blocksize);
+        for (int i = 0; i < remainingTargets.length; i++) {
+          results.add(remainingTargets[i]);
+        }
+      }
+      return results.toArray(new DatanodeDescriptor[results.size()]);
+    } catch (NotEnoughReplicasException nr) {
+      // Fall back to regular block placement disregarding favored nodes hint
+      return chooseTarget(src, numOfReplicas, writer, 
+          new ArrayList<DatanodeDescriptor>(numOfReplicas), false, 
+          excludedNodes, blocksize);
+    }
+  }
+
   /** This is the implementation. */
   DatanodeDescriptor[] chooseTarget(int numOfReplicas,
                                     DatanodeDescriptor writer,
@@ -140,15 +194,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
       excludedNodes = new HashMap<Node, Node>();
     }
      
-    int clusterSize = clusterMap.getNumOfLeaves();
-    int totalNumOfReplicas = chosenNodes.size()+numOfReplicas;
-    if (totalNumOfReplicas > clusterSize) {
-      numOfReplicas -= (totalNumOfReplicas-clusterSize);
-      totalNumOfReplicas = clusterSize;
-    }
-      
-    int maxNodesPerRack = 
-      (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
+    int[] result = getMaxNodesPerRack(chosenNodes, numOfReplicas);
+    numOfReplicas = result[0];
+    int maxNodesPerRack = result[1];
       
     List<DatanodeDescriptor> results = 
       new ArrayList<DatanodeDescriptor>(chosenNodes);
@@ -172,6 +220,18 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     return getPipeline((writer==null)?localNode:writer,
                        results.toArray(new DatanodeDescriptor[results.size()]));
   }
+
+  private int[] getMaxNodesPerRack(List<DatanodeDescriptor> chosenNodes,
+      int numOfReplicas) {
+    int clusterSize = clusterMap.getNumOfLeaves();
+    int totalNumOfReplicas = chosenNodes.size()+numOfReplicas;
+    if (totalNumOfReplicas > clusterSize) {
+      numOfReplicas -= (totalNumOfReplicas-clusterSize);
+      totalNumOfReplicas = clusterSize;
+    }
+    int maxNodesPerRack = (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
+    return new int[] {numOfReplicas, maxNodesPerRack};
+  }
     
   /* choose <i>numOfReplicas</i> from all data nodes */
   private DatanodeDescriptor chooseTarget(int numOfReplicas,

+ 71 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -313,6 +313,68 @@ public class DatanodeManager {
     return host2DatanodeMap.getDatanodeByHost(host);
   }
 
+  /** @return the datanode descriptor for the host. */
+  public DatanodeDescriptor getDatanodeByXferAddr(String host, int xferPort) {
+    return host2DatanodeMap.getDatanodeByXferAddr(host, xferPort);
+  }
+
+  /**
+   * Given datanode address or host name, returns the DatanodeDescriptor for the
+   * same, or if it doesn't find the datanode, it looks for a machine local and
+   * then rack local datanode, if a rack local datanode is not possible either,
+   * it returns the DatanodeDescriptor of any random node in the cluster.
+   *
+   * @param address hostaddress:transfer address
+   * @return the best match for the given datanode
+   * @throws IOException when no datanode is found for given address
+   */
+  DatanodeDescriptor getDatanodeDescriptor(String address) {
+    DatanodeDescriptor node = null;
+    int colon = address.indexOf(":");
+    int xferPort;
+    String host = address;
+    if (colon > 0) {
+      host = address.substring(0, colon);
+      xferPort = Integer.parseInt(address.substring(colon+1));
+      node = getDatanodeByXferAddr(host, xferPort);
+    }
+    if (node == null) {
+      node = getDatanodeByHost(host);
+    }
+    if (node == null) {
+      String networkLocation = resolveNetworkLocation(host);
+
+      // If the current cluster doesn't contain the node, fallback to
+      // something machine local and then rack local.
+      List<Node> rackNodes = getNetworkTopology()
+                                   .getDatanodesInRack(networkLocation);
+      if (rackNodes != null) {
+        // Try something machine local.
+        for (Node rackNode : rackNodes) {
+          if (((DatanodeDescriptor) rackNode).getIpAddr().equals(host)) {
+            node = (DatanodeDescriptor) rackNode;
+            break;
+          }
+        }
+
+        // Try something rack local.
+        if (node == null && !rackNodes.isEmpty()) {
+          node = (DatanodeDescriptor) (rackNodes
+              .get(DFSUtil.getRandom().nextInt(rackNodes.size())));
+        }
+      }
+
+      // If we can't even choose rack local, just choose any node in the
+      // cluster.
+      if (node == null) {
+        node = (DatanodeDescriptor)getNetworkTopology()
+                                   .chooseRandom(NodeBase.ROOT);
+      }
+    }
+    return node;
+  }
+
+
   /** Get a datanode descriptor given corresponding storageID */
   DatanodeDescriptor getDatanode(final String storageID) {
     return datanodeMap.get(storageID);
@@ -442,8 +504,13 @@ public class DatanodeManager {
     }
   }
 
+  public String resolveNetworkLocation(String host) {
+    DatanodeID d = parseDNFromHostsEntry(host);
+    return resolveNetworkLocation(d);
+  }
+
   /* Resolve a node's network location */
-  private void resolveNetworkLocation (DatanodeDescriptor node) {
+  private String resolveNetworkLocation (DatanodeID node) {
     List<String> names = new ArrayList<String>(1);
     if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
       names.add(node.getIpAddr());
@@ -461,7 +528,7 @@ public class DatanodeManager {
     } else {
       networkLocation = rName.get(0);
     }
-    node.setNetworkLocation(networkLocation);
+    return networkLocation;
   }
 
   private boolean inHostsList(DatanodeID node) {
@@ -694,7 +761,7 @@ public class DatanodeManager {
           nodeS.setDisallowed(false); // Node is in the include list
           
           // resolve network location
-          resolveNetworkLocation(nodeS);
+          nodeS.setNetworkLocation(resolveNetworkLocation(nodeS));
           getNetworkTopology().add(nodeS);
             
           // also treat the registration message as a heartbeat
@@ -726,7 +793,7 @@ public class DatanodeManager {
         = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK);
       boolean success = false;
       try {
-        resolveNetworkLocation(nodeDescr);
+        nodeDescr.setNetworkLocation(resolveNetworkLocation(nodeDescr));
         networktopology.add(nodeDescr);
   
         // register new datanode

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -2208,7 +2208,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * client to "try again later".
    */
   LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
-      ExtendedBlock previous, HashMap<Node, Node> excludedNodes)
+      ExtendedBlock previous, HashMap<Node, Node> excludedNodes, 
+      List<String> favoredNodes)
       throws LeaseExpiredException, NotReplicatedYetException,
       QuotaExceededException, SafeModeException, UnresolvedLinkException,
       IOException {
@@ -2253,8 +2254,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     }
 
     // choose targets for the new block to be allocated.
-    final DatanodeDescriptor targets[] = getBlockManager().chooseTarget(
-        src, replication, clientNode, excludedNodes, blockSize);
+    final DatanodeDescriptor targets[] = getBlockManager().chooseTarget( 
+        src, replication, clientNode, excludedNodes, blockSize, favoredNodes);
 
     // Part II.
     // Allocate a new block, add it to the INode and the BlocksMap. 

+ 6 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -29,6 +29,7 @@ import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
@@ -473,7 +474,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
   
   @Override
   public LocatedBlock addBlock(String src, String clientName,
-      ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId)
+      ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
+      String[] favoredNodes)
       throws IOException {
     if (stateChangeLog.isDebugEnabled()) {
       stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src
@@ -486,8 +488,10 @@ class NameNodeRpcServer implements NamenodeProtocols {
         excludedNodesSet.put(node, node);
       }
     }
+    List<String> favoredNodesList = (favoredNodes == null) ? null
+        : Arrays.asList(favoredNodes);
     LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
-        clientName, previous, excludedNodesSet);
+        clientName, previous, excludedNodesSet, favoredNodesList);
     if (locatedBlock != null)
       metrics.incrAddBlockOps();
     return locatedBlock;

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto

@@ -121,6 +121,7 @@ message AddBlockRequestProto {
   optional ExtendedBlockProto previous = 3;
   repeated DatanodeInfoProto excludeNodes = 4;
   optional uint64 fileId = 5 [default = 0];  // default as a bogus id
+  repeated string favoredNodes = 6; //the set of datanodes to use for the block
 }
 
 message AddBlockResponseProto {

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java

@@ -241,7 +241,7 @@ public class TestDFSClientRetries {
                          anyString(),
                          any(ExtendedBlock.class),
                          any(DatanodeInfo[].class),
-                         anyLong())).thenAnswer(answer);
+                         anyLong(), any(String[].class))).thenAnswer(answer);
     
     Mockito.doReturn(
             new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
@@ -390,7 +390,7 @@ public class TestDFSClientRetries {
         }
       }).when(spyNN).addBlock(Mockito.anyString(), Mockito.anyString(),
           Mockito.<ExtendedBlock> any(), Mockito.<DatanodeInfo[]> any(),
-          Mockito.anyLong());
+          Mockito.anyLong(), Mockito.<String[]> any());
 
       doAnswer(new Answer<Boolean>() {
 
@@ -432,7 +432,7 @@ public class TestDFSClientRetries {
       Mockito.verify(spyNN, Mockito.atLeastOnce()).addBlock(
           Mockito.anyString(), Mockito.anyString(),
           Mockito.<ExtendedBlock> any(), Mockito.<DatanodeInfo[]> any(),
-          Mockito.anyLong());
+          Mockito.anyLong(), Mockito.<String[]> any());
       Mockito.verify(spyNN, Mockito.atLeastOnce()).complete(
           Mockito.anyString(), Mockito.anyString(),
           Mockito.<ExtendedBlock>any());

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java

@@ -519,7 +519,7 @@ public class TestFileCreation {
 
       // add one block to the file
       LocatedBlock location = client.getNamenode().addBlock(file1.toString(),
-          client.clientName, null, null, INodeId.GRANDFATHER_INODE_ID);
+          client.clientName, null, null, INodeId.GRANDFATHER_INODE_ID, null);
       System.out.println("testFileCreationError2: "
           + "Added block " + location.getBlock());
 
@@ -570,7 +570,7 @@ public class TestFileCreation {
       createFile(dfs, f, 3);
       try {
         cluster.getNameNodeRpc().addBlock(f.toString(), client.clientName,
-            null, null, INodeId.GRANDFATHER_INODE_ID);
+            null, null, INodeId.GRANDFATHER_INODE_ID, null);
         fail();
       } catch(IOException ioe) {
         FileSystem.LOG.info("GOOD!", ioe);

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java

@@ -1059,7 +1059,7 @@ public class NNThroughputBenchmark {
       ExtendedBlock prevBlock = null;
       for(int jdx = 0; jdx < blocksPerFile; jdx++) {
         LocatedBlock loc = nameNodeProto.addBlock(fileName, clientName,
-            prevBlock, null, INodeId.GRANDFATHER_INODE_ID);
+            prevBlock, null, INodeId.GRANDFATHER_INODE_ID, null);
         prevBlock = loc.getBlock();
         for(DatanodeInfo dnInfo : loc.getLocations()) {
           int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getXferAddr());

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddBlockRetry.java

@@ -26,6 +26,7 @@ import static org.mockito.Mockito.spy;
 import java.lang.reflect.Field;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -108,7 +109,7 @@ public class TestAddBlockRetry {
         if(count == 1) { // run second addBlock()
           LOG.info("Starting second addBlock for " + src);
           nn.addBlock(src, "clientName", null, null,
-              INodeId.GRANDFATHER_INODE_ID);
+              INodeId.GRANDFATHER_INODE_ID, null);
           LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);
           assertEquals("Must be one block", 1, lbs.getLocatedBlocks().size());
           lb2 = lbs.get(0);
@@ -119,7 +120,7 @@ public class TestAddBlockRetry {
       }
     }).when(spyBM).chooseTarget(Mockito.anyString(), Mockito.anyInt(),
         Mockito.<DatanodeDescriptor>any(), Mockito.<HashMap<Node, Node>>any(),
-        Mockito.anyLong());
+        Mockito.anyLong(), Mockito.<List<String>>any());
 
     // create file
     nn.create(src, FsPermission.getFileDefault(),
@@ -129,7 +130,7 @@ public class TestAddBlockRetry {
 
     // start first addBlock()
     LOG.info("Starting first addBlock for " + src);
-    nn.addBlock(src, "clientName", null, null, INodeId.GRANDFATHER_INODE_ID);
+    nn.addBlock(src, "clientName", null, null, INodeId.GRANDFATHER_INODE_ID, null);
 
     // check locations
     LocatedBlocks lbs = nn.getBlockLocations(src, 0, Long.MAX_VALUE);

+ 220 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java

@@ -0,0 +1,220 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Random;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.junit.Test;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+
+public class TestFavoredNodesEndToEnd {
+  private static MiniDFSCluster cluster;
+  private static Configuration conf;
+  private final static int NUM_DATA_NODES = 10;
+  private final static int NUM_FILES = 10;
+  private final static byte[] SOME_BYTES = new String("foo").getBytes();
+  private static DistributedFileSystem dfs;
+  private static ArrayList<DataNode> datanodes;
+  
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf = new Configuration();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES)
+        .build();
+    cluster.waitClusterUp();
+    dfs = cluster.getFileSystem();
+    datanodes = cluster.getDataNodes();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    if (cluster != null) { 
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testFavoredNodesEndToEnd() throws Exception {
+    //create 10 files with random preferred nodes
+    for (int i = 0; i < NUM_FILES; i++) {
+      Random rand = new Random(System.currentTimeMillis() + i);
+      //pass a new created rand so as to get a uniform distribution each time
+      //without too much collisions (look at the do-while loop in getDatanodes)
+      InetSocketAddress datanode[] = getDatanodes(rand);
+      Path p = new Path("/filename"+i);
+      FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true,
+          4096, (short)3, (long)4096, null, datanode);
+      out.write(SOME_BYTES);
+      out.close();
+      BlockLocation[] locations = 
+          dfs.getClient().getBlockLocations(p.toUri().getPath(), 0, 
+              Long.MAX_VALUE);
+      //make sure we have exactly one block location, and three hosts
+      assertTrue(locations.length == 1 && locations[0].getHosts().length == 3);
+      //verify the files got created in the right nodes
+      for (BlockLocation loc : locations) {
+        String[] hosts = loc.getNames();
+        String[] hosts1 = getStringForInetSocketAddrs(datanode);
+        assertTrue(compareNodes(hosts, hosts1));
+      }
+    }
+  }
+
+  @Test
+  public void testWhenFavoredNodesNotPresent() throws Exception {
+    //when we ask for favored nodes but the nodes are not there, we should
+    //get some other nodes. In other words, the write to hdfs should not fail
+    //and if we do getBlockLocations on the file, we should see one blklocation
+    //and three hosts for that
+    Random rand = new Random(System.currentTimeMillis());
+    InetSocketAddress arbitraryAddrs[] = new InetSocketAddress[3];
+    for (int i = 0; i < 3; i++) {
+      arbitraryAddrs[i] = getArbitraryLocalHostAddr();
+    }
+    Path p = new Path("/filename-foo-bar");
+    FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true,
+        4096, (short)3, (long)4096, null, arbitraryAddrs);
+    out.write(SOME_BYTES);
+    out.close();
+    BlockLocation[] locations = 
+        dfs.getClient().getBlockLocations(p.toUri().getPath(), 0, 
+            Long.MAX_VALUE);
+    assertTrue(locations.length == 1 && locations[0].getHosts().length == 3);
+  }
+
+  @Test
+  public void testWhenSomeNodesAreNotGood() throws Exception {
+    //make some datanode not "good" so that even if the client prefers it,
+    //the namenode would not give it as a replica to write to
+    DatanodeInfo d = cluster.getNameNode().getNamesystem().getBlockManager()
+           .getDatanodeManager().getDatanodeByXferAddr(
+               datanodes.get(0).getXferAddress().getAddress().getHostAddress(), 
+               datanodes.get(0).getXferAddress().getPort());
+    //set the decommission status to true so that 
+    //BlockPlacementPolicyDefault.isGoodTarget returns false for this dn
+    d.setDecommissioned();
+    InetSocketAddress addrs[] = new InetSocketAddress[3];
+    for (int i = 0; i < 3; i++) {
+      addrs[i] = datanodes.get(i).getXferAddress();
+    }
+    Path p = new Path("/filename-foo-bar-baz");
+    FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true,
+        4096, (short)3, (long)4096, null, addrs);
+    out.write(SOME_BYTES);
+    out.close();
+    BlockLocation[] locations = 
+        dfs.getClient().getBlockLocations(p.toUri().getPath(), 0, 
+            Long.MAX_VALUE);
+    //reset the state
+    d.stopDecommission();
+    assertTrue(locations.length == 1 && locations[0].getHosts().length == 3);
+    //also make sure that the datanode[0] is not in the list of hosts
+    String datanode0 = 
+        datanodes.get(0).getXferAddress().getAddress().getHostAddress()
+        + ":" + datanodes.get(0).getXferAddress().getPort();
+    for (int i = 0; i < 3; i++) {
+      if (locations[0].getNames()[i].equals(datanode0)) {
+        fail(datanode0 + " not supposed to be a replica for the block");
+      }
+    }
+  }
+
+  private String[] getStringForInetSocketAddrs(InetSocketAddress[] datanode) {
+    String strs[] = new String[datanode.length];
+    for (int i = 0; i < datanode.length; i++) {
+      strs[i] = datanode[i].getAddress().getHostAddress() + ":" + 
+       datanode[i].getPort();
+    }
+    return strs;
+  }
+
+  private boolean compareNodes(String[] dnList1, String[] dnList2) {
+    for (int i = 0; i < dnList1.length; i++) {
+      boolean matched = false;
+      for (int j = 0; j < dnList2.length; j++) {
+        if (dnList1[i].equals(dnList2[j])) {
+          matched = true;
+          break;
+        }
+      }
+      if (matched == false) {
+        fail(dnList1[i] + " not a favored node");
+      }
+    }
+    return true;
+  }
+
+  private InetSocketAddress[] getDatanodes(Random rand) {
+    //Get some unique random indexes
+    int idx1 = rand.nextInt(NUM_DATA_NODES);
+    int idx2;
+    
+    do {
+      idx2 = rand.nextInt(NUM_DATA_NODES);
+    } while (idx1 == idx2);
+    
+    int idx3;
+    do {
+      idx3 = rand.nextInt(NUM_DATA_NODES);
+    } while (idx2 == idx3 || idx1 == idx3);
+    
+    InetSocketAddress[] addrs = new InetSocketAddress[3];
+    addrs[0] = datanodes.get(idx1).getXferAddress();
+    addrs[1] = datanodes.get(idx2).getXferAddress();
+    addrs[2] = datanodes.get(idx3).getXferAddress();
+    return addrs;
+  }
+
+  private InetSocketAddress getArbitraryLocalHostAddr() 
+      throws UnknownHostException{
+    Random rand = new Random(System.currentTimeMillis());
+    int port = rand.nextInt(65535);
+    while (true) {
+      boolean conflict = false;
+      for (DataNode d : datanodes) {
+        if (d.getXferAddress().getPort() == port) {
+          port = rand.nextInt(65535);
+          conflict = true;
+        }
+      }
+      if (conflict == false) {
+        break;
+      }
+    }
+    return new InetSocketAddress(InetAddress.getLocalHost(), port);
+  }
+}