Prechádzať zdrojové kódy

Merge r1414874 and r1414878 from trunk for HDFS-3495. Update Balancer to support new NetworkTopology with NodeGroup.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1488848 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 12 rokov pred
rodič
commit
16995501d3

+ 16 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java

@@ -29,6 +29,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /** The class represents a cluster of computer with a tree hierarchical
  * network topology.
@@ -53,6 +56,19 @@ public class NetworkTopology {
       super(msg);
     }
   }
+  
+  /**
+   * Get an instance of NetworkTopology based on the value of the configuration
+   * parameter net.topology.impl.
+   * 
+   * @param conf the configuration to be used
+   * @return an instance of NetworkTopology
+   */
+  public static NetworkTopology getInstance(Configuration conf){
+    return ReflectionUtils.newInstance(
+        conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
+        NetworkTopology.class, NetworkTopology.class), conf);
+  }
 
   /** InnerNode represents a switch/router of a data center or rack.
    * Different from a leaf node, it has non-null children.

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -43,6 +43,9 @@ Release 2.1.0-beta - UNRELEASED
     HDFS-3601. Add BlockPlacementPolicyWithNodeGroup to support block placement
     with 4-layer network topology.  (Junping Du via szetszwo)
 
+    HDFS-3495. Update Balancer to support new NetworkTopology with NodeGroup.
+    (Junping Du via szetszwo)
+
   IMPROVEMENTS
 
     HDFS-4222. NN is unresponsive and loses heartbeats from DNs when 

+ 184 - 56
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java

@@ -169,7 +169,7 @@ import org.apache.hadoop.util.ToolRunner;
  * <ol>
  * <li>The cluster is balanced. Exiting
  * <li>No block can be moved. Exiting...
- * <li>No block has been moved for 3 iterations. Exiting...
+ * <li>No block has been moved for 5 iterations. Exiting...
  * <li>Received an IO exception: failure reason. Exiting...
  * <li>Another balancer is running. Exiting...
  * </ol>
@@ -223,7 +223,7 @@ public class Balancer {
   private Map<String, BalancerDatanode> datanodes
                  = new HashMap<String, BalancerDatanode>();
   
-  private NetworkTopology cluster = new NetworkTopology();
+  private NetworkTopology cluster;
   
   final static private int MOVER_THREAD_POOL_SIZE = 1000;
   final private ExecutorService moverExecutor = 
@@ -250,7 +250,7 @@ public class Balancer {
      * Return true if a block and its proxy are chosen; false otherwise
      */
     private boolean chooseBlockAndProxy() {
-      // iterate all source's blocks until find a good one    
+      // iterate all source's blocks until find a good one
       for (Iterator<BalancerBlock> blocks=
         source.getBlockIterator(); blocks.hasNext();) {
         if (markMovedIfGoodBlock(blocks.next())) {
@@ -294,22 +294,35 @@ public class Balancer {
      * @return true if a proxy is found; otherwise false
      */
     private boolean chooseProxySource() {
-      // check if there is replica which is on the same rack with the target
+      final DatanodeInfo targetDN = target.getDatanode();
+      boolean find = false;
       for (BalancerDatanode loc : block.getLocations()) {
-        if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
-          if (loc.addPendingBlock(this)) {
-            proxySource = loc;
+        // check if there is replica which is on the same rack with the target
+        if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) {
+          find = true;
+          // if cluster is not nodegroup aware or the proxy is on the same 
+          // nodegroup with target, then we already find the nearest proxy
+          if (!cluster.isNodeGroupAware() 
+              || cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN)) {
             return true;
           }
         }
-      }
-      // find out a non-busy replica
-      for (BalancerDatanode loc : block.getLocations()) {
-        if (loc.addPendingBlock(this)) {
-          proxySource = loc;
-          return true;
+        
+        if (!find) {
+          // find out a non-busy replica out of rack of target
+          find = addTo(loc);
         }
       }
+      
+      return find;
+    }
+    
+    // add a BalancerDatanode as proxy source for specific block movement
+    private boolean addTo(BalancerDatanode bdn) {
+      if (bdn.addPendingBlock(this)) {
+        proxySource = bdn;
+        return true;
+      }
       return false;
     }
     
@@ -687,7 +700,7 @@ public class Balancer {
         NodeTask task = tasks.next();
         BalancerDatanode target = task.getDatanode();
         PendingBlockMove pendingBlock = new PendingBlockMove();
-        if ( target.addPendingBlock(pendingBlock) ) { 
+        if (target.addPendingBlock(pendingBlock)) { 
           // target is not busy, so do a tentative block allocation
           pendingBlock.source = this;
           pendingBlock.target = target;
@@ -788,9 +801,10 @@ public class Balancer {
    */
   private static void checkReplicationPolicyCompatibility(Configuration conf
       ) throws UnsupportedActionException {
-    if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() != 
-        BlockPlacementPolicyDefault.class) {
-      throw new UnsupportedActionException("Balancer without BlockPlacementPolicyDefault");
+    if (BlockPlacementPolicy.getInstance(conf, null, null) instanceof 
+        BlockPlacementPolicyDefault) {
+      throw new UnsupportedActionException(
+          "Balancer without BlockPlacementPolicyDefault");
     }
   }
 
@@ -805,6 +819,7 @@ public class Balancer {
     this.threshold = p.threshold;
     this.policy = p.policy;
     this.nnc = theblockpool;
+    cluster = NetworkTopology.getInstance(conf);
   }
   
   /* Shuffle datanode array */
@@ -915,9 +930,15 @@ public class Balancer {
    * Return total number of bytes to move in this iteration
    */
   private long chooseNodes() {
-    // Match nodes on the same rack first
+    // First, match nodes on the same node group if cluster has nodegroup
+    // awareness
+    if (cluster.isNodeGroupAware()) {
+      chooseNodesOnSameNodeGroup();
+    }
+    
+    // Then, match nodes on the same rack
     chooseNodes(true);
-    // Then match nodes on different racks
+    // At last, match nodes on different racks
     chooseNodes(false);
     
     assert (datanodes.size() >= sources.size()+targets.size())
@@ -932,6 +953,102 @@ public class Balancer {
     }
     return bytesToMove;
   }
+  
+  /**
+   * Decide all <source, target> pairs where source and target are 
+   * on the same NodeGroup
+   */
+  private void chooseNodesOnSameNodeGroup() {
+
+    /* first step: match each overUtilized datanode (source) to
+     * one or more underUtilized datanodes within same NodeGroup(targets).
+     */
+    chooseOnSameNodeGroup(overUtilizedDatanodes, underUtilizedDatanodes);
+
+    /* match each remaining overutilized datanode (source) to below average 
+     * utilized datanodes within the same NodeGroup(targets).
+     * Note only overutilized datanodes that haven't had that max bytes to move
+     * satisfied in step 1 are selected
+     */
+    chooseOnSameNodeGroup(overUtilizedDatanodes, belowAvgUtilizedDatanodes);
+
+    /* match each remaining underutilized datanode to above average utilized 
+     * datanodes within the same NodeGroup.
+     * Note only underutilized datanodes that have not had that max bytes to
+     * move satisfied in step 1 are selected.
+     */
+    chooseOnSameNodeGroup(underUtilizedDatanodes, aboveAvgUtilizedDatanodes);
+  }
+  
+  /**
+   * Match two sets of nodes within the same NodeGroup, one should be source
+   * nodes (utilization > Avg), and the other should be destination nodes 
+   * (utilization < Avg).
+   * @param datanodes
+   * @param candidates
+   */
+  private <D extends BalancerDatanode, C extends BalancerDatanode> void 
+      chooseOnSameNodeGroup(Collection<D> datanodes, Collection<C> candidates) {
+    for (Iterator<D> i = datanodes.iterator(); i.hasNext();) {
+      final D datanode = i.next();
+      for(; chooseOnSameNodeGroup(datanode, candidates.iterator()); );
+      if (!datanode.isMoveQuotaFull()) {
+        i.remove();
+      }
+    }
+  }
+  
+  /**
+   * Match one datanode with a set of candidates nodes within the same NodeGroup.
+   */
+  private <T extends BalancerDatanode> boolean chooseOnSameNodeGroup(
+      BalancerDatanode dn, Iterator<T> candidates) {
+    final T chosen = chooseCandidateOnSameNodeGroup(dn, candidates);
+    if (chosen == null) {
+      return false;
+    }
+    if (dn instanceof Source) {
+      matchSourceWithTargetToMove((Source)dn, chosen);
+    } else {
+      matchSourceWithTargetToMove((Source)chosen, dn);
+    }
+    if (!chosen.isMoveQuotaFull()) {
+      candidates.remove();
+    }
+    return true;
+  }
+  
+  private void matchSourceWithTargetToMove(
+      Source source, BalancerDatanode target) {
+    long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
+    NodeTask nodeTask = new NodeTask(target, size);
+    source.addNodeTask(nodeTask);
+    target.incScheduledSize(nodeTask.getSize());
+    sources.add(source);
+    targets.add(target);
+    LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
+        +source.datanode.getName() + " to " + target.datanode.getName());
+  }
+  
+  /** choose a datanode from <code>candidates</code> within the same NodeGroup 
+   * of <code>dn</code>.
+   */
+  private <T extends BalancerDatanode> T chooseCandidateOnSameNodeGroup(
+      BalancerDatanode dn, Iterator<T> candidates) {
+    if (dn.isMoveQuotaFull()) {
+      for(; candidates.hasNext(); ) {
+        final T c = candidates.next();
+        if (!c.isMoveQuotaFull()) {
+          candidates.remove();
+          continue;
+        }
+        if (cluster.isOnSameNodeGroup(dn.getDatanode(), c.getDatanode())) {
+          return c;
+        }
+      }
+    }
+    return null;
+  }
 
   /* if onRack is true, decide all <source, target> pairs
    * where source and target are on the same rack; Otherwise
@@ -942,33 +1059,33 @@ public class Balancer {
     /* first step: match each overUtilized datanode (source) to
      * one or more underUtilized datanodes (targets).
      */
-    chooseTargets(underUtilizedDatanodes.iterator(), onRack);
+    chooseTargets(underUtilizedDatanodes, onRack);
     
     /* match each remaining overutilized datanode (source) to 
      * below average utilized datanodes (targets).
      * Note only overutilized datanodes that haven't had that max bytes to move
      * satisfied in step 1 are selected
      */
-    chooseTargets(belowAvgUtilizedDatanodes.iterator(), onRack);
+    chooseTargets(belowAvgUtilizedDatanodes, onRack);
 
-    /* match each remaining underutilized datanode to 
-     * above average utilized datanodes.
+    /* match each remaining underutilized datanode (target) to 
+     * above average utilized datanodes (source).
      * Note only underutilized datanodes that have not had that max bytes to
      * move satisfied in step 1 are selected.
      */
-    chooseSources(aboveAvgUtilizedDatanodes.iterator(), onRack);
+    chooseSources(aboveAvgUtilizedDatanodes, onRack);
   }
    
   /* choose targets from the target candidate list for each over utilized
    * source datanode. OnRackTarget determines if the chosen target 
    * should be on the same rack as the source
    */
-  private void chooseTargets(  
-      Iterator<BalancerDatanode> targetCandidates, boolean onRackTarget ) {
+  private void chooseTargets(
+      Collection<BalancerDatanode> targetCandidates, boolean onRackTarget ) {
     for (Iterator<Source> srcIterator = overUtilizedDatanodes.iterator();
         srcIterator.hasNext();) {
       Source source = srcIterator.next();
-      while (chooseTarget(source, targetCandidates, onRackTarget)) {
+      while (chooseTarget(source, targetCandidates.iterator(), onRackTarget)) {
       }
       if (!source.isMoveQuotaFull()) {
         srcIterator.remove();
@@ -982,11 +1099,11 @@ public class Balancer {
    * should be on the same rack as the target
    */
   private void chooseSources(
-      Iterator<Source> sourceCandidates, boolean onRackSource) {
+      Collection<Source> sourceCandidates, boolean onRackSource) {
     for (Iterator<BalancerDatanode> targetIterator = 
       underUtilizedDatanodes.iterator(); targetIterator.hasNext();) {
       BalancerDatanode target = targetIterator.next();
-      while (chooseSource(target, sourceCandidates, onRackSource)) {
+      while (chooseSource(target, sourceCandidates.iterator(), onRackSource)) {
       }
       if (!target.isMoveQuotaFull()) {
         targetIterator.remove();
@@ -1026,23 +1143,15 @@ public class Balancer {
     }
     if (foundTarget) {
       assert(target != null):"Choose a null target";
-      long size = Math.min(source.availableSizeToMove(),
-          target.availableSizeToMove());
-      NodeTask nodeTask = new NodeTask(target, size);
-      source.addNodeTask(nodeTask);
-      target.incScheduledSize(nodeTask.getSize());
-      sources.add(source);
-      targets.add(target);
+      matchSourceWithTargetToMove(source, target);
       if (!target.isMoveQuotaFull()) {
         targetCandidates.remove();
       }
-      LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
-          +source.datanode + " to " + target.datanode);
       return true;
     }
     return false;
   }
-  
+
   /* For the given target, choose sources from the source candidate list.
    * OnRackSource determines if the chosen source 
    * should be on the same rack as the target
@@ -1074,18 +1183,10 @@ public class Balancer {
     }
     if (foundSource) {
       assert(source != null):"Choose a null source";
-      long size = Math.min(source.availableSizeToMove(),
-          target.availableSizeToMove());
-      NodeTask nodeTask = new NodeTask(target, size);
-      source.addNodeTask(nodeTask);
-      target.incScheduledSize(nodeTask.getSize());
-      sources.add(source);
-      targets.add(target);
+      matchSourceWithTargetToMove(source, target);
       if ( !source.isMoveQuotaFull()) {
-        sourceCandidates.remove();
-      }
-      LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
-          +source.datanode + " to " + target.datanode);
+          sourceCandidates.remove();
+        }
       return true;
     }
     return false;
@@ -1227,6 +1328,10 @@ public class Balancer {
     if (block.isLocatedOnDatanode(target)) {
       return false;
     }
+    if (cluster.isNodeGroupAware() && 
+        isOnSameNodeGroupWithReplicas(target, block, source)) {
+      return false;
+    }
 
     boolean goodBlock = false;
     if (cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
@@ -1258,10 +1363,32 @@ public class Balancer {
     }
     return goodBlock;
   }
-  
+
+  /**
+   * Check if there are any replica (other than source) on the same node group
+   * with target. If true, then target is not a good candidate for placing 
+   * specific block replica as we don't want 2 replicas under the same nodegroup 
+   * after balance.
+   * @param target targetDataNode
+   * @param block dataBlock
+   * @param source sourceDataNode
+   * @return true if there are any replica (other than source) on the same node
+   * group with target
+   */
+  private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode target,
+      BalancerBlock block, Source source) {
+    for (BalancerDatanode loc : block.locations) {
+      if (loc != source && 
+        cluster.isOnSameNodeGroup(loc.getDatanode(), target.getDatanode())) {
+          return true;
+        }
+      }
+    return false;
+  }
+
   /* reset all fields in a balancer preparing for the next iteration */
-  private void resetData() {
-    this.cluster = new NetworkTopology();
+  private void resetData(Configuration conf) {
+    this.cluster = NetworkTopology.getInstance(conf);
     this.overUtilizedDatanodes.clear();
     this.aboveAvgUtilizedDatanodes.clear();
     this.belowAvgUtilizedDatanodes.clear();
@@ -1333,7 +1460,8 @@ public class Balancer {
   }
 
   /** Run an iteration for all datanodes. */
-  private ReturnStatus run(int iteration, Formatter formatter) {
+  private ReturnStatus run(int iteration, Formatter formatter,
+      Configuration conf) {
     try {
       /* get all live datanodes of a cluster and their disk usage
        * decide the number of bytes need to be moved
@@ -1387,7 +1515,7 @@ public class Balancer {
       }
 
       // clean all lists
-      resetData();
+      resetData(conf);
       return ReturnStatus.IN_PROGRESS;
     } catch (IllegalArgumentException e) {
       System.out.println(e + ".  Exiting ...");
@@ -1435,7 +1563,7 @@ public class Balancer {
         Collections.shuffle(connectors);
         for(NameNodeConnector nnc : connectors) {
           final Balancer b = new Balancer(nnc, p, conf);
-          final ReturnStatus r = b.run(iteration, formatter);
+          final ReturnStatus r = b.run(iteration, formatter, conf);
           if (r == ReturnStatus.IN_PROGRESS) {
             done = false;
           } else if (r != ReturnStatus.SUCCESS) {

+ 1 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -39,7 +39,6 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -163,11 +162,7 @@ public class DatanodeManager {
     this.namesystem = namesystem;
     this.blockManager = blockManager;
     
-    Class<? extends NetworkTopology> networkTopologyClass =
-        conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
-            NetworkTopology.class, NetworkTopology.class);
-    networktopology = (NetworkTopology) ReflectionUtils.newInstance(
-        networkTopologyClass, conf);
+    networktopology = NetworkTopology.getInstance(conf);
 
     this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
 

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -320,7 +320,7 @@ public class MiniDFSCluster {
   /**
    * Used by builder to create and return an instance of MiniDFSCluster
    */
-  private MiniDFSCluster(Builder builder) throws IOException {
+  protected MiniDFSCluster(Builder builder) throws IOException {
     if (builder.nnTopology == null) {
       // If no topology is specified, build a single NN. 
       builder.nnTopology = MiniDFSNNTopology.simpleSingleNN(
@@ -368,7 +368,7 @@ public class MiniDFSCluster {
 
   private Configuration conf;
   private NameNodeInfo[] nameNodes;
-  private int numDataNodes;
+  protected int numDataNodes;
   protected ArrayList<DataNodeProperties> dataNodes = 
                          new ArrayList<DataNodeProperties>();
   private File base_dir;
@@ -2318,7 +2318,7 @@ public class MiniDFSCluster {
     return nameNodes[nnIndex].nameNode;
   }
   
-  private void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
+  protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
                            boolean checkDataNodeAddrConfig) throws IOException {
     if (setupHostsFile) {
       String hostsFile = conf.get(DFS_HOSTS, "").trim();

+ 228 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java

@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
+import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.StaticMapping;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.ssl.SSLFactory;
+
+public class MiniDFSClusterWithNodeGroup extends MiniDFSCluster {
+
+  private static String[] NODE_GROUPS = null;
+  private static final Log LOG = LogFactory.getLog(MiniDFSClusterWithNodeGroup.class);
+  
+  public MiniDFSClusterWithNodeGroup(Builder builder) throws IOException {
+    super(builder);
+  }
+
+  public static void setNodeGroups (String[] nodeGroups) {
+    NODE_GROUPS = nodeGroups;
+  }
+
+  public synchronized void startDataNodes(Configuration conf, int numDataNodes,
+      boolean manageDfsDirs, StartupOption operation, 
+      String[] racks, String[] nodeGroups, String[] hosts,
+      long[] simulatedCapacities,
+      boolean setupHostsFile,
+      boolean checkDataNodeAddrConfig,
+      boolean checkDataNodeHostConfig) throws IOException {
+    if (operation == StartupOption.RECOVER) {
+      return;
+    }
+    if (checkDataNodeHostConfig) {
+      conf.setIfUnset(DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1");
+    } else {
+      conf.set(DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1");
+    }
+    conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1");
+
+    int curDatanodesNum = dataNodes.size();
+    // for mincluster's the default initialDelay for BRs is 0
+    if (conf.get(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY) == null) {
+      conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 0);
+    }
+    // If minicluster's name node is null assume that the conf has been
+    // set with the right address:port of the name node.
+    //
+    if (racks != null && numDataNodes > racks.length ) {
+      throw new IllegalArgumentException( "The length of racks [" + racks.length
+          + "] is less than the number of datanodes [" + numDataNodes + "].");
+    }
+
+    if (nodeGroups != null && numDataNodes > nodeGroups.length ) {
+      throw new IllegalArgumentException( "The length of nodeGroups [" + nodeGroups.length
+          + "] is less than the number of datanodes [" + numDataNodes + "].");
+    }
+
+    if (hosts != null && numDataNodes > hosts.length ) {
+      throw new IllegalArgumentException( "The length of hosts [" + hosts.length
+          + "] is less than the number of datanodes [" + numDataNodes + "].");
+    }
+    //Generate some hostnames if required
+    if (racks != null && hosts == null) {
+      hosts = new String[numDataNodes];
+      for (int i = curDatanodesNum; i < curDatanodesNum + numDataNodes; i++) {
+        hosts[i - curDatanodesNum] = "host" + i + ".foo.com";
+      }
+    }
+
+    if (simulatedCapacities != null 
+        && numDataNodes > simulatedCapacities.length) {
+      throw new IllegalArgumentException( "The length of simulatedCapacities [" 
+          + simulatedCapacities.length
+          + "] is less than the number of datanodes [" + numDataNodes + "].");
+    }
+
+    String [] dnArgs = (operation == null ||
+    operation != StartupOption.ROLLBACK) ?
+        null : new String[] {operation.getName()};
+
+    for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
+      Configuration dnConf = new HdfsConfiguration(conf);
+      // Set up datanode address
+      setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
+      if (manageDfsDirs) {
+        File dir1 = getInstanceStorageDir(i, 0);
+        File dir2 = getInstanceStorageDir(i, 1);
+        dir1.mkdirs();
+        dir2.mkdirs();
+        if (!dir1.isDirectory() || !dir2.isDirectory()) { 
+          throw new IOException("Mkdirs failed to create directory for DataNode "
+              + i + ": " + dir1 + " or " + dir2);
+        }
+        String dirs = fileAsURI(dir1) + "," + fileAsURI(dir2);
+        dnConf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
+        conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
+      }
+      if (simulatedCapacities != null) {
+        SimulatedFSDataset.setFactory(dnConf);
+        dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY,
+        simulatedCapacities[i-curDatanodesNum]);
+      }
+      LOG.info("Starting DataNode " + i + " with "
+          + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY + ": "
+          + dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
+      if (hosts != null) {
+        dnConf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, hosts[i - curDatanodesNum]);
+        LOG.info("Starting DataNode " + i + " with hostname set to: "
+            + dnConf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY));
+      }
+      if (racks != null) {
+        String name = hosts[i - curDatanodesNum];
+        if (nodeGroups == null) {
+          LOG.info("Adding node with hostname : " + name + " to rack " +
+             racks[i-curDatanodesNum]);
+          StaticMapping.addNodeToRack(name,racks[i-curDatanodesNum]);
+        } else {
+          LOG.info("Adding node with hostname : " + name + " to serverGroup " +
+              nodeGroups[i-curDatanodesNum] + " and rack " +
+              racks[i-curDatanodesNum]);
+          StaticMapping.addNodeToRack(name,racks[i-curDatanodesNum] + 
+              nodeGroups[i-curDatanodesNum]);
+        }
+      }
+      Configuration newconf = new HdfsConfiguration(dnConf); // save config
+      if (hosts != null) {
+        NetUtils.addStaticResolution(hosts[i - curDatanodesNum], "localhost");
+      }
+      
+      SecureResources secureResources = null;
+      if (UserGroupInformation.isSecurityEnabled()) {
+        SSLFactory sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, dnConf);
+        try {
+          secureResources = SecureDataNodeStarter.getSecureResources(sslFactory, dnConf);
+        } catch (Exception ex) {
+          ex.printStackTrace();
+        }
+      }
+      DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf, secureResources);
+      if(dn == null)
+        throw new IOException("Cannot start DataNode in "
+          + dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
+      //since the HDFS does things based on IP:port, we need to add the mapping
+      //for IP:port to rackId
+      String ipAddr = dn.getXferAddress().getAddress().getHostAddress();
+      if (racks != null) {
+        int port = dn.getXferAddress().getPort();
+        if (nodeGroups == null) {
+          LOG.info("Adding node with IP:port : " + ipAddr + ":" + port +
+              " to rack " + racks[i-curDatanodesNum]);
+          StaticMapping.addNodeToRack(ipAddr + ":" + port,
+              racks[i-curDatanodesNum]);
+        } else {
+          LOG.info("Adding node with IP:port : " + ipAddr + ":" + port + " to nodeGroup " +
+          nodeGroups[i-curDatanodesNum] + " and rack " + racks[i-curDatanodesNum]);
+          StaticMapping.addNodeToRack(ipAddr + ":" + port, racks[i-curDatanodesNum] + 
+              nodeGroups[i-curDatanodesNum]);
+        }
+      }
+      dn.runDatanodeDaemon();
+      dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs, secureResources));
+    }
+    curDatanodesNum += numDataNodes;
+    this.numDataNodes += numDataNodes;
+    waitActive();
+  }
+
+  public synchronized void startDataNodes(Configuration conf, int numDataNodes, 
+      boolean manageDfsDirs, StartupOption operation, 
+      String[] racks, String[] nodeGroups, String[] hosts,
+      long[] simulatedCapacities,
+      boolean setupHostsFile) throws IOException {
+    startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, nodeGroups, 
+        hosts, simulatedCapacities, setupHostsFile, false, false);
+  }
+
+  public void startDataNodes(Configuration conf, int numDataNodes, 
+      boolean manageDfsDirs, StartupOption operation, 
+      String[] racks, long[] simulatedCapacities,
+      String[] nodeGroups) throws IOException {
+    startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, nodeGroups,
+        null, simulatedCapacities, false);
+  }
+
+  // This is for initialize from parent class.
+  @Override
+  public synchronized void startDataNodes(Configuration conf, int numDataNodes, 
+      boolean manageDfsDirs, StartupOption operation, 
+      String[] racks, String[] hosts,
+      long[] simulatedCapacities,
+      boolean setupHostsFile,
+      boolean checkDataNodeAddrConfig,
+      boolean checkDataNodeHostConfig) throws IOException {
+    startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, 
+        NODE_GROUPS, hosts, simulatedCapacities, setupHostsFile, 
+        checkDataNodeAddrConfig, checkDataNodeHostConfig);
+  }
+
+}

+ 290 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java

@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.balancer;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSClusterWithNodeGroup;
+import org.apache.hadoop.hdfs.NameNodeProxies;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.net.NetworkTopology;
+import org.junit.Test;
+
+/**
+ * This class tests if a balancer schedules tasks correctly.
+ */
+public class TestBalancerWithNodeGroup extends TestCase {
+  private static final Log LOG = LogFactory.getLog(
+  "org.apache.hadoop.hdfs.TestBalancerWithNodeGroup");
+  
+  final private static long CAPACITY = 500L;
+  final private static String RACK0 = "/rack0";
+  final private static String RACK1 = "/rack1";
+  final private static String NODEGROUP0 = "/nodegroup0";
+  final private static String NODEGROUP1 = "/nodegroup1";
+  final private static String NODEGROUP2 = "/nodegroup2";
+  final static private String fileName = "/tmp.txt";
+  final static private Path filePath = new Path(fileName);
+  MiniDFSClusterWithNodeGroup cluster;
+
+  ClientProtocol client;
+
+  static final long TIMEOUT = 20000L; //msec
+  static final double CAPACITY_ALLOWED_VARIANCE = 0.005;  // 0.5%
+  static final double BALANCE_ALLOWED_VARIANCE = 0.11;    // 10%+delta
+  static final int DEFAULT_BLOCK_SIZE = 10;
+
+  static {
+    Balancer.setBlockMoveWaitTime(1000L) ;
+  }
+
+  static Configuration createConf() {
+    Configuration conf = new HdfsConfiguration();
+    TestBalancer.initConf(conf);
+    conf.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY, 
+        "org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
+    conf.set("dfs.block.replicator.classname", 
+        "org.apache.hadoop.hdfs.server.blockmanagement." +
+        "BlockPlacementPolicyWithNodeGroup");
+    return conf;
+  }
+
+  /**
+   * Wait until heartbeat gives expected results, within CAPACITY_ALLOWED_VARIANCE, 
+   * summed over all nodes.  Times out after TIMEOUT msec.
+   * @param expectedUsedSpace
+   * @param expectedTotalSpace
+   * @throws IOException - if getStats() fails
+   * @throws TimeoutException
+   */
+  private void waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace)
+  throws IOException, TimeoutException {
+    long timeout = TIMEOUT;
+    long failtime = (timeout <= 0L) ? Long.MAX_VALUE
+             : System.currentTimeMillis() + timeout;
+
+    while (true) {
+      long[] status = client.getStats();
+      double totalSpaceVariance = Math.abs((double)status[0] - expectedTotalSpace) 
+          / expectedTotalSpace;
+      double usedSpaceVariance = Math.abs((double)status[1] - expectedUsedSpace) 
+          / expectedUsedSpace;
+      if (totalSpaceVariance < CAPACITY_ALLOWED_VARIANCE 
+          && usedSpaceVariance < CAPACITY_ALLOWED_VARIANCE)
+        break; //done
+
+      if (System.currentTimeMillis() > failtime) {
+        throw new TimeoutException("Cluster failed to reached expected values of "
+            + "totalSpace (current: " + status[0] 
+            + ", expected: " + expectedTotalSpace 
+            + "), or usedSpace (current: " + status[1] 
+            + ", expected: " + expectedUsedSpace
+            + "), in more than " + timeout + " msec.");
+      }
+      try {
+        Thread.sleep(100L);
+      } catch(InterruptedException ignored) {
+      }
+    }
+  }
+
+  /**
+   * Wait until balanced: each datanode gives utilization within 
+   * BALANCE_ALLOWED_VARIANCE of average
+   * @throws IOException
+   * @throws TimeoutException
+   */
+  private void waitForBalancer(long totalUsedSpace, long totalCapacity) 
+  throws IOException, TimeoutException {
+    long timeout = TIMEOUT;
+    long failtime = (timeout <= 0L) ? Long.MAX_VALUE
+        : System.currentTimeMillis() + timeout;
+    final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
+    boolean balanced;
+    do {
+      DatanodeInfo[] datanodeReport = 
+          client.getDatanodeReport(DatanodeReportType.ALL);
+      assertEquals(datanodeReport.length, cluster.getDataNodes().size());
+      balanced = true;
+      for (DatanodeInfo datanode : datanodeReport) {
+        double nodeUtilization = ((double)datanode.getDfsUsed())
+            / datanode.getCapacity();
+        if (Math.abs(avgUtilization - nodeUtilization) >
+            BALANCE_ALLOWED_VARIANCE) {
+          balanced = false;
+          if (System.currentTimeMillis() > failtime) {
+            throw new TimeoutException(
+                "Rebalancing expected avg utilization to become "
+                + avgUtilization + ", but on datanode " + datanode
+                + " it remains at " + nodeUtilization
+                + " after more than " + TIMEOUT + " msec.");
+          }
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException ignored) {
+          }
+          break;
+        }
+      }
+    } while (!balanced);
+  }
+
+  private void runBalancer(Configuration conf,
+      long totalUsedSpace, long totalCapacity) throws Exception {
+    waitForHeartBeat(totalUsedSpace, totalCapacity);
+
+    // start rebalancing
+    Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+    final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
+    assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
+
+    waitForHeartBeat(totalUsedSpace, totalCapacity);
+    LOG.info("Rebalancing with default factor.");
+    waitForBalancer(totalUsedSpace, totalCapacity);
+  }
+
+  /**
+   * Create a cluster with even distribution, and a new empty node is added to
+   * the cluster, then test rack locality for balancer policy. 
+   */
+  @Test
+  public void testBalancerWithRackLocality() throws Exception {
+    Configuration conf = createConf();
+    long[] capacities = new long[]{CAPACITY, CAPACITY};
+    String[] racks = new String[]{RACK0, RACK1};
+    String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP1};
+    
+    int numOfDatanodes = capacities.length;
+    assertEquals(numOfDatanodes, racks.length);
+    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf)
+                                .numDataNodes(capacities.length)
+                                .racks(racks)
+                                .simulatedCapacities(capacities);
+    MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
+    cluster = new MiniDFSClusterWithNodeGroup(builder);
+    try {
+      cluster.waitActive();
+      client = NameNodeProxies.createProxy(conf, 
+          cluster.getFileSystem(0).getUri(),
+          ClientProtocol.class).getProxy();
+
+      long totalCapacity = TestBalancer.sum(capacities);
+
+      // fill up the cluster to be 30% full
+      long totalUsedSpace = totalCapacity * 3 / 10;
+      TestBalancer.createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
+          (short) numOfDatanodes, 0);
+      
+      long newCapacity = CAPACITY;
+      String newRack = RACK1;
+      String newNodeGroup = NODEGROUP2;
+      // start up an empty node with the same capacity and on the same rack
+      cluster.startDataNodes(conf, 1, true, null, new String[]{newRack},
+          new long[] {newCapacity}, new String[]{newNodeGroup});
+
+      totalCapacity += newCapacity;
+
+      // run balancer and validate results
+      runBalancer(conf, totalUsedSpace, totalCapacity);
+      
+      DatanodeInfo[] datanodeReport = 
+              client.getDatanodeReport(DatanodeReportType.ALL);
+      
+      Map<String, Integer> rackToUsedCapacity = new HashMap<String, Integer>();
+      for (DatanodeInfo datanode: datanodeReport) {
+        String rack = NetworkTopology.getFirstHalf(datanode.getNetworkLocation());
+        int usedCapacity = (int) datanode.getDfsUsed();
+         
+        if (rackToUsedCapacity.get(rack) != null) {
+          rackToUsedCapacity.put(rack, usedCapacity + rackToUsedCapacity.get(rack));
+        } else {
+          rackToUsedCapacity.put(rack, usedCapacity);
+        }
+      }
+      assertEquals(rackToUsedCapacity.size(), 2);
+      assertEquals(rackToUsedCapacity.get(RACK0), rackToUsedCapacity.get(RACK1));
+      
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  /**
+   * Create a cluster with even distribution, and a new empty node is added to
+   * the cluster, then test node-group locality for balancer policy.
+   */
+  @Test
+  public void testBalancerWithNodeGroup() throws Exception {
+    Configuration conf = createConf();
+    long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY, CAPACITY};
+    String[] racks = new String[]{RACK0, RACK0, RACK1, RACK1};
+    String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP0, NODEGROUP1, NODEGROUP2};
+    
+    int numOfDatanodes = capacities.length;
+    assertEquals(numOfDatanodes, racks.length);
+    assertEquals(numOfDatanodes, nodeGroups.length);
+    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf)
+                                .numDataNodes(capacities.length)
+                                .racks(racks)
+                                .simulatedCapacities(capacities);
+    MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
+    cluster = new MiniDFSClusterWithNodeGroup(builder);
+    try {
+      cluster.waitActive();
+      client = NameNodeProxies.createProxy(conf, 
+          cluster.getFileSystem(0).getUri(),
+          ClientProtocol.class).getProxy();
+
+      long totalCapacity = TestBalancer.sum(capacities);
+      // fill up the cluster to be 20% full
+      long totalUsedSpace = totalCapacity * 2 / 10;
+      TestBalancer.createFile(cluster, filePath, totalUsedSpace / (numOfDatanodes/2),
+          (short) (numOfDatanodes/2), 0);
+      
+      long newCapacity = CAPACITY;
+      String newRack = RACK1;
+      String newNodeGroup = NODEGROUP2;
+      // start up an empty node with the same capacity and on NODEGROUP2
+      cluster.startDataNodes(conf, 1, true, null, new String[]{newRack},
+          new long[] {newCapacity}, new String[]{newNodeGroup});
+
+      totalCapacity += newCapacity;
+
+      // run balancer and validate results
+      runBalancer(conf, totalUsedSpace, totalCapacity);
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
+}