Browse Source

HADOOP-692. Add rack awareness to HDFS's placement of blocks. Contributed by Hairong.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@502821 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 năm trước cách đây
mục cha
commit
26adf43c5b

+ 3 - 0
CHANGES.txt

@@ -146,6 +146,9 @@ Trunk (unreleased changes)
 45. HADOOP-309.  Fix two NullPointerExceptions in StatusHttpServer.
     (navychen via cutting)
 
+46. HADOOP-692.  Add rack awareness to HDFS's placement of blocks.
+    (Hairong Kuang via cutting)
+
 
 Release 0.10.1 - 2007-01-10
 

+ 8 - 0
conf/hadoop-default.xml

@@ -346,6 +346,14 @@ creations/deletions), or "all".</description>
  	</description>
 </property>
 
+<property>
+  <name>dfs.network.script</name>
+  <value></value>
+  <description>
+        Specifies a script name that print the network location path
+        of the current machine.
+  </description>
+</property>
 
 <!-- map/reduce properties -->
 

+ 7 - 20
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -427,30 +427,17 @@ class DFSClient implements FSConstants {
 
     /**
      * Pick the best node from which to stream the data.
-     * That's the local one, if available.
+     * Entries in <i>nodes</i> are already in the priority order
      */
     private DatanodeInfo bestNode(DatanodeInfo nodes[], TreeSet deadNodes) throws IOException {
-        if ((nodes == null) || 
-            (nodes.length - deadNodes.size() < 1)) {
-            throw new IOException("No live nodes contain current block");
-        }
-        DatanodeInfo chosenNode = null;
+      if (nodes != null) { 
         for (int i = 0; i < nodes.length; i++) {
-            if (deadNodes.contains(nodes[i])) {
-                continue;
-            }
-            String nodename = nodes[i].getHost();
-            if (localName.equals(nodename)) {
-                chosenNode = nodes[i];
-                break;
-            }
-        }
-        if (chosenNode == null) {
-            do {
-                chosenNode = nodes[Math.abs(r.nextInt()) % nodes.length];
-            } while (deadNodes.contains(chosenNode));
+          if (!deadNodes.contains(nodes[i])) {
+            return nodes[i];
+          }
         }
-        return chosenNode;
+      }
+        throw new IOException("No live nodes contain current block");
     }
 
     /***************************************************************

+ 132 - 10
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -24,10 +24,12 @@ import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.metrics.Metrics;
 import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.mapred.StatusHttpServer;
+import org.apache.hadoop.net.NetworkTopology;
 
 import java.io.*;
 import java.net.*;
@@ -105,6 +107,7 @@ public class DataNode implements FSConstants, Runnable {
     DatanodeProtocol namenode;
     FSDataset data;
     DatanodeRegistration dnRegistration;
+    private String networkLoc;
     boolean shouldRun = true;
     Vector receivedBlockList = new Vector();
     int xmitsInProgress = 0;
@@ -168,9 +171,15 @@ public class DataNode implements FSConstants, Runnable {
      * 'dataDirs' is where the blocks are stored.
      */
     DataNode(Configuration conf, String[] dataDirs) throws IOException {
-        this(InetAddress.getLocalHost().getHostName(), 
+      this(conf, NetworkTopology.DEFAULT_RACK, dataDirs );
+    }
+    
+    DataNode(Configuration conf, String networkLoc, String[] dataDirs) throws IOException {
+        this(InetAddress.getLocalHost().getHostName(),
+             networkLoc,
              dataDirs,
              createSocketAddr(conf.get("fs.default.name", "local")), conf);
+        // register datanode
         int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075);
         String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0");
         this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort, true);
@@ -197,7 +206,8 @@ public class DataNode implements FSConstants, Runnable {
      * 
      * @see DataStorage
      */
-    private DataNode(String machineName, 
+    private DataNode(String machineName,
+                    String networkLoc,
                     String[] dataDirs, 
                     InetSocketAddress nameNodeAddr, 
                     Configuration conf ) throws IOException {
@@ -247,6 +257,7 @@ public class DataNode implements FSConstants, Runnable {
                                         storage.getStorageID(),
                                         -1,
                                         "" );
+      this.networkLoc = networkLoc;
       // initialize data node internal structure
       this.data = new FSDataset(volumes, conf);
       this.dataXceiveServer = new Daemon(new DataXceiveServer(ss));
@@ -292,7 +303,7 @@ public class DataNode implements FSConstants, Runnable {
     private void register() throws IOException {
       while( true ) {
         try {
-          dnRegistration = namenode.register( dnRegistration );
+          dnRegistration = namenode.register( dnRegistration, networkLoc );
           break;
         } catch( SocketTimeoutException e ) {  // namenode is busy
           LOG.info("Problem connecting to server: " + getNameNodeAddr());
@@ -1045,9 +1056,9 @@ public class DataNode implements FSConstants, Runnable {
     
     /** Start datanode daemon.
      */
-    public static void run(Configuration conf) throws IOException {
+    public static void run(Configuration conf, String networkLoc) throws IOException {
         String[] dataDirs = conf.getStrings("dfs.data.dir");
-        DataNode dn = makeInstance(dataDirs, conf);
+        DataNode dn = makeInstance(networkLoc, dataDirs, conf);
         dataNodeList.add(dn);
         if (dn != null) {
           Thread t = new Thread(dn, "DataNode: [" +
@@ -1075,8 +1086,9 @@ public class DataNode implements FSConstants, Runnable {
   /** Start a single datanode daemon and wait for it to finish.
    *  If this thread is specifically interrupted, it will stop waiting.
    */
-  private static void runAndWait(Configuration conf) throws IOException {
-    run(conf);
+  private static void runAndWait(Configuration conf, String networkLoc)
+    throws IOException {
+    run(conf, networkLoc);
     if (dataNodeThreadList.size() > 0) {
       Thread t = (Thread) dataNodeThreadList.remove(dataNodeThreadList.size()-1);
       try {
@@ -1101,7 +1113,12 @@ public class DataNode implements FSConstants, Runnable {
    * no directory from this directory list can be created.
    * @throws IOException
    */
-  static DataNode makeInstance(String[] dataDirs, Configuration conf)
+  static DataNode makeInstance( String[] dataDirs, Configuration conf)
+  throws IOException {
+    return makeInstance(NetworkTopology.DEFAULT_RACK, dataDirs, conf );
+  }
+  
+  static DataNode makeInstance(String networkLoc, String[] dataDirs, Configuration conf)
   throws IOException {
     ArrayList<String> dirs = new ArrayList<String>();
     for (int i = 0; i < dataDirs.length; i++) {
@@ -1113,7 +1130,7 @@ public class DataNode implements FSConstants, Runnable {
         LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage() );
       }
     }
-    return ((dirs.size() > 0) ? new DataNode(conf, dirs.toArray(new String[dirs.size()])) : null);
+    return ((dirs.size() > 0) ? new DataNode(conf, networkLoc, dirs.toArray(new String[dirs.size()])) : null);
   }
 
   public String toString() {
@@ -1124,13 +1141,118 @@ public class DataNode implements FSConstants, Runnable {
         ", xmitsInProgress=" + xmitsInProgress +
         "}";
   }
+  
+    /* Get the network location by running a script configured in conf */
+    private static String getNetworkLoc( Configuration conf ) 
+                          throws IOException {
+        String locScript = conf.get("dfs.network.script" );
+        if( locScript == null ) return null;
+
+        LOG.info( "Starting to run script to get datanode network location");
+        Process p = Runtime.getRuntime().exec( locScript );
+        StringBuffer networkLoc = new StringBuffer();
+        final BufferedReader inR = new BufferedReader(
+                new InputStreamReader(p.getInputStream() ) );
+        final BufferedReader errR = new BufferedReader(
+                new InputStreamReader( p.getErrorStream() ) );
+
+        // read & log any error messages from the running script
+        Thread errThread = new Thread() {
+            public void start() {
+                try {
+                String errLine = errR.readLine();
+                while(errLine != null) {
+                    LOG.warn("Network script error: "+errLine);
+                    errLine = errR.readLine();
+                }
+                } catch( IOException e) {
+                    
+                }
+            }
+        };
+        try {
+            errThread.start();
+            
+            // fetch output from the process
+            String line = inR.readLine();
+            while( line != null ) {
+                networkLoc.append( line );
+                line = inR.readLine();
+            }
+            try {
+            // wait for the process to finish
+            int returnVal = p.waitFor();
+            // check the exit code
+            if( returnVal != 0 ) {
+                throw new IOException("Process exits with nonzero status: "+locScript);
+            }
+            } catch (InterruptedException e) {
+                throw new IOException( e.getMessage() );
+            } finally {
+                try {
+                    // make sure that the error thread exits
+                    errThread.join();
+                } catch (InterruptedException je) {
+                    LOG.warn( StringUtils.stringifyException(je));
+                }
+            }
+        } finally {
+            // close in & error streams
+            try {
+                inR.close();
+            } catch ( IOException ine ) {
+                throw ine;
+            } finally {
+                errR.close();
+            }
+        }
+
+        return networkLoc.toString();
+    }
+
+
+    /* Get the network location from the command line */
+    private static String getNetworkLoc(String args[]) {
+        for( int i=0; i< args.length; i++ ) { 
+            if ("-r".equals(args[i])||"--rack".equals(args[i]) ) {
+                if( i==args.length-1 ) {
+                    printUsage();
+                } else {
+                    return args[++i];
+                }
+            }
+        }
+        return null;
+    }
+    
+    /* Return the datanode's network location 
+     * either from the command line, from script, or a default value
+     */
+    private static String getNetworkLoc(String args[], Configuration conf)
+                          throws IOException {
+        String networkLoc = getNetworkLoc( args );
+        if( networkLoc == null ) {
+            networkLoc = getNetworkLoc( conf );
+        }
+        if( networkLoc == null ) {
+            return NetworkTopology.DEFAULT_RACK;
+        } else {
+            return NodeBase.normalize( networkLoc );
+        }
+    }
+    
+    private static void printUsage() {
+        System.err.println(
+                "Usage: java DataNode [-r, --rack <network location>]");        
+    }
+
 
     /**
      */
     public static void main(String args[]) throws IOException {
       try {
         Configuration conf = new Configuration();
-        runAndWait(conf);
+        runAndWait(conf, getNetworkLoc(args, conf));
       } catch ( Throwable e ) {
         LOG.error( StringUtils.stringifyException( e ) );
         System.exit(-1);

+ 43 - 6
src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java

@@ -19,6 +19,9 @@ package org.apache.hadoop.dfs;
 
 import java.util.*;
 
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+
 /**************************************************
  * DatanodeDescriptor tracks stats on a given DataNode,
  * such as available storage capacity, last update time, etc.,
@@ -32,25 +35,42 @@ import java.util.*;
  * @author Mike Cafarella
  * @author Konstantin Shvachko
  **************************************************/
-class DatanodeDescriptor extends DatanodeInfo {
+public class DatanodeDescriptor extends DatanodeInfo {
 
   private volatile Collection<Block> blocks = new TreeSet<Block>();
   // isAlive == heartbeats.contains(this)
   // This is an optimization, because contains takes O(n) time on Arraylist
   protected boolean isAlive = false;
 
-  DatanodeDescriptor() {
+  /** Default constructor */
+  public DatanodeDescriptor() {
     super();
   }
   
-  DatanodeDescriptor( DatanodeID nodeID ) {
+  /** DatanodeDescriptor constructor
+   * @param nodeID id of the data node
+   */
+  public DatanodeDescriptor( DatanodeID nodeID ) {
     this( nodeID, 0L, 0L, 0 );
   }
+
+  /** DatanodeDescriptor constructor
+   * 
+   * @param nodeID id of the data node
+   * @param networkLocation location of the data node in network
+   */
+  public DatanodeDescriptor( DatanodeID nodeID, String networkLocation ) {
+    this( nodeID, networkLocation, 0L, 0L, 0 );
+  }
   
-  /**
-   * Create DatanodeDescriptor.
+  /** DatanodeDescriptor constructor
+   * 
+   * @param nodeID id of the data node
+   * @param capacity capacity of the data node
+   * @param remaining remaing capacity of the data node
+   * @param xceiverCount # of data transfers at the data node
    */
-  DatanodeDescriptor( DatanodeID nodeID, 
+  public DatanodeDescriptor( DatanodeID nodeID, 
                       long capacity, 
                       long remaining,
                       int xceiverCount ) {
@@ -58,6 +78,23 @@ class DatanodeDescriptor extends DatanodeInfo {
     updateHeartbeat(capacity, remaining, xceiverCount);
   }
 
+  /** DatanodeDescriptor constructor
+   * 
+   * @param nodeID id of the data node
+   * @param networkLocation location of the data node in network
+   * @param capacity capacity of the data node
+   * @param remaining remaing capacity of the data node
+   * @param xceiverCount # of data transfers at the data node
+   */
+  public DatanodeDescriptor( DatanodeID nodeID,
+                              String networkLocation,
+                              long capacity, 
+                              long remaining,
+                              int xceiverCount ) {
+    super( nodeID, networkLocation );
+    updateHeartbeat( capacity, remaining, xceiverCount);
+  }
+
   /**
    */
   void updateBlocks(Block newBlocks[]) {

+ 35 - 7
src/java/org/apache/hadoop/dfs/DatanodeInfo.java

@@ -23,11 +23,15 @@ import java.io.IOException;
 import java.util.Date;
 
 import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
 
 /** 
  * DatanodeInfo represents the status of a DataNode.
@@ -37,11 +41,12 @@ import org.apache.hadoop.io.WritableUtils;
  * @author Mike Cafarella
  * @author Konstantin Shvachko
  */
-public class DatanodeInfo extends DatanodeID {
+public class DatanodeInfo extends DatanodeID implements Node {
   protected long capacity;
   protected long remaining;
   protected long lastUpdate;
   protected int xceiverCount;
+  private String location = NetworkTopology.DEFAULT_RACK;
 
   // administrative states of a datanode
   public enum AdminStates {NORMAL, DECOMMISSION_INPROGRESS, DECOMMISSIONED; }
@@ -59,16 +64,22 @@ public class DatanodeInfo extends DatanodeID {
     this.remaining = from.getRemaining();
     this.lastUpdate = from.getLastUpdate();
     this.xceiverCount = from.getXceiverCount();
+    this.location = from.getNetworkLocation();
     this.adminState = from.adminState;
   }
 
   DatanodeInfo( DatanodeID nodeID ) {
-    super( nodeID );
-    this.capacity = 0L;
-    this.remaining = 0L;
-    this.lastUpdate = 0L;
-    this.xceiverCount = 0;
-    this.adminState = null;
+      super( nodeID );
+      this.capacity = 0L;
+      this.remaining = 0L;
+      this.lastUpdate = 0L;
+      this.xceiverCount = 0;
+      this.adminState = null;    
+  }
+  
+  DatanodeInfo( DatanodeID nodeID, String location ) {
+      this(nodeID);
+      this.location = location;
   }
   
   /** The raw capacity. */
@@ -103,6 +114,18 @@ public class DatanodeInfo extends DatanodeID {
     this.xceiverCount = xceiverCount; 
   }
 
+  /** rack name **/
+  public String getNetworkLocation() {return location;}
+    
+  /** Sets the rack name */
+  void setNetworkLocation(String location) {
+    this.location = NodeBase.normalize(location);
+  }
+  
+  public String getPath() {
+      return location+NodeBase.PATH_SEPARATOR_STR+name;
+  }
+
   /** A formatted string for reporting the status of the DataNode. */
   public String getDatanodeReport() {
     StringBuffer buffer = new StringBuffer();
@@ -110,6 +133,9 @@ public class DatanodeInfo extends DatanodeID {
     long r = getRemaining();
     long u = c - r;
     buffer.append("Name: "+name+"\n");
+    if(!NetworkTopology.DEFAULT_RACK.equals(location)) {
+        buffer.append("Rack: "+location+"\n");
+    }
     if (isDecommissioned()) {
       buffer.append("State          : Decommissioned\n");
     } else if (isDecommissionInProgress()) {
@@ -209,6 +235,7 @@ public class DatanodeInfo extends DatanodeID {
     out.writeLong(remaining);
     out.writeLong(lastUpdate);
     out.writeInt(xceiverCount);
+    Text.writeString( out, location );
     WritableUtils.writeEnum(out, getAdminState());
   }
 
@@ -220,6 +247,7 @@ public class DatanodeInfo extends DatanodeID {
     this.remaining = in.readLong();
     this.lastUpdate = in.readLong();
     this.xceiverCount = in.readInt();
+    this.location = Text.readString( in );
     AdminStates newState = (AdminStates) WritableUtils.readEnum(in,
                                          AdminStates.class);
     setAdminState(newState);

+ 4 - 4
src/java/org/apache/hadoop/dfs/DatanodeProtocol.java

@@ -31,8 +31,7 @@ import org.apache.hadoop.ipc.VersionedProtocol;
  * @author Michael Cafarella
  **********************************************************************/
 interface DatanodeProtocol extends VersionedProtocol {
-  public static final long versionID = 4L; // BlockCommand.action:
-                                           // replace DNA_REPORT by DNA_REGISTER
+  public static final long versionID = 5L;  // register takes a new parameter
   
   // error code
   final static int DISK_ERROR = 1;
@@ -52,13 +51,14 @@ interface DatanodeProtocol extends VersionedProtocol {
    * Register Datanode.
    *
    * @see org.apache.hadoop.dfs.DataNode#register()
-   * @see org.apache.hadoop.dfs.FSNamesystem#registerDatanode(DatanodeRegistration)
+   * @see org.apache.hadoop.dfs.FSNamesystem#registerDatanode(DatanodeRegistration, String)
    * 
    * @return updated {@link org.apache.hadoop.dfs.DatanodeRegistration}, which contains 
    * new storageID if the datanode did not have one and
    * registration ID for further communication.
    */
-    public DatanodeRegistration register( DatanodeRegistration registration
+    public DatanodeRegistration register( DatanodeRegistration registration,
+                                          String networkLocation
                                         ) throws IOException;
     /**
      * sendHeartbeat() tells the NameNode that the DataNode is still

+ 529 - 111
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -23,11 +23,13 @@ import org.apache.hadoop.io.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.mapred.StatusHttpServer;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.fs.Path;
 
 import java.io.*;
-import java.net.InetSocketAddress;
 import java.util.*;
+
 import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
@@ -189,6 +191,11 @@ class FSNamesystem implements FSConstants {
     private String localMachine;
     private int port;
     private SafeModeInfo safeMode;  // safe mode information
+    
+    // datanode networktoplogy
+    NetworkTopology clusterMap = new NetworkTopology();
+    // for block replicas placement
+    Replicator replicator = new Replicator();
 
     /**
      * dirs is a list oif directories where the filesystem directory state 
@@ -497,10 +504,12 @@ class FSNamesystem implements FSConstants {
                     machineSets[i] = new DatanodeDescriptor[0];
                 } else {
                     machineSets[i] = new DatanodeDescriptor[containingNodes.size()];
-                    int j = 0;
-                    for (Iterator<DatanodeDescriptor> it = containingNodes.iterator(); it.hasNext(); j++) {
-                        machineSets[i][j] = it.next();
-                    }
+                    ArrayList<DatanodeDescriptor> containingNodesList =
+                      new ArrayList<DatanodeDescriptor>(containingNodes.size());
+                    containingNodesList.addAll(containingNodes);
+                    
+                    machineSets[i] = replicator.sortByDistance(
+                        getDatanodeByHost(clientMachine), containingNodesList);
                 }
             }
 
@@ -671,9 +680,9 @@ class FSNamesystem implements FSConstants {
           }
         }
 
-        // Get the array of replication targets 
-        DatanodeDescriptor targets[] = chooseTargets(replication, null, 
-                                               clientMachine, blockSize);
+        // Get the array of replication targets
+        DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
+            getDatanodeByHost(clientMachine.toString()), null, blockSize);
         if (targets.length < this.minReplication) {
             throw new IOException("failed to create file "+src
                     +" on client " + clientMachine
@@ -754,9 +763,13 @@ class FSNamesystem implements FSConstants {
           throw new NotReplicatedYetException("Not replicated yet");
         }
         
-        // Get the array of replication targets 
-        DatanodeDescriptor targets[] = chooseTargets(pendingFile.getReplication(), 
-            null, pendingFile.getClientMachine(), pendingFile.getBlockSize());
+        // Get the array of replication targets
+        String clientHost = pendingFile.getClientMachine().toString();
+        DatanodeDescriptor targets[] = replicator.chooseTarget(
+            (int)(pendingFile.getReplication()),
+            getDatanodeByHost(clientHost),
+            null,
+            pendingFile.getBlockSize());
         if (targets.length < this.minReplication) {
           throw new IOException("File " + src + " could only be replicated to " +
                                 targets.length + " nodes, instead of " +
@@ -1422,7 +1435,8 @@ class FSNamesystem implements FSConstants {
      * @see DataNode#register()
      * @author Konstantin Shvachko
      */
-    public synchronized void registerDatanode( DatanodeRegistration nodeReg 
+    public synchronized void registerDatanode( DatanodeRegistration nodeReg,
+                                               String networkLocation
                                               ) throws IOException {
       NameNode.stateChangeLog.info(
           "BLOCK* NameSystem.registerDatanode: "
@@ -1434,6 +1448,8 @@ class FSNamesystem implements FSConstants {
       DatanodeDescriptor nodeN = getDatanodeByName( nodeReg.getName() );
       
       if( nodeN != null && nodeN != nodeS ) {
+          NameNode.LOG.info( "BLOCK* NameSystem.registerDatanode: "
+                  + "node from name: " + nodeN.getName() );
         // nodeN previously served a different data storage, 
         // which is not served by anybody anymore.
         removeDatanode( nodeN );
@@ -1457,11 +1473,15 @@ class FSNamesystem implements FSConstants {
           // data storage, which from now on will be served by a new node.
           NameNode.stateChangeLog.debug(
             "BLOCK* NameSystem.registerDatanode: "
-            + "node " + nodeS.name
+            + "node " + nodeS.getName()
             + " is replaced by " + nodeReg.getName() + "." );
         }
         getEditLog().logRemoveDatanode( nodeS );
+        // update cluster map
+        clusterMap.remove( nodeS );
         nodeS.updateRegInfo( nodeReg );
+        nodeS.setNetworkLocation( networkLocation );
+        clusterMap.add( nodeS );
         getEditLog().logAddDatanode( nodeS );
         
         // also treat the registration message as a heartbeat
@@ -1484,7 +1504,8 @@ class FSNamesystem implements FSConstants {
             + "new storageID " + nodeReg.getStorageID() + " assigned." );
       }
       // register new datanode
-      DatanodeDescriptor nodeDescr = new DatanodeDescriptor( nodeReg );
+      DatanodeDescriptor nodeDescr 
+                  = new DatanodeDescriptor( nodeReg, networkLocation );
       unprotectedAddDatanode( nodeDescr );
       getEditLog().logAddDatanode( nodeDescr );
       
@@ -1636,6 +1657,7 @@ class FSNamesystem implements FSConstants {
           removeStoredBlock(it.next(), nodeInfo);
       }
       unprotectedRemoveDatanode(nodeInfo);
+      clusterMap.remove(nodeInfo);
     }
 
     void unprotectedRemoveDatanode( DatanodeDescriptor nodeDescr ) {
@@ -1649,6 +1671,7 @@ class FSNamesystem implements FSConstants {
     
     void unprotectedAddDatanode( DatanodeDescriptor nodeDescr ) {
       datanodeMap.put( nodeDescr.getStorageID(), nodeDescr );
+      clusterMap.add(nodeDescr);
       NameNode.stateChangeLog.debug(
           "BLOCK* NameSystem.unprotectedAddDatanode: "
           + "node " + nodeDescr.getName() + " is added to datanodeMap." );
@@ -1661,7 +1684,8 @@ class FSNamesystem implements FSConstants {
      * @param nodeID node
      */
     void wipeDatanode( DatanodeID nodeID ) {
-      datanodeMap.remove(nodeID.getStorageID());
+      String key = nodeID.getStorageID();
+      datanodeMap.remove(key);
       NameNode.stateChangeLog.debug(
           "BLOCK* NameSystem.wipeDatanode: "
           + nodeID.getName() + " storage " + nodeID.getStorageID() 
@@ -2008,6 +2032,7 @@ class FSNamesystem implements FSConstants {
      * Total raw bytes.
      */
     public long totalCapacity() {
+
       synchronized (heartbeats) {
         return totalCapacity;
       }
@@ -2294,6 +2319,23 @@ class FSNamesystem implements FSConstants {
       return count;
     }
 
+    /*
+     * Filter nodes that are marked for decommison in the given list. 
+     * Return a list of non-decommissioned nodes
+     */
+    private List<DatanodeDescriptor> filterDecommissionedNodes(
+        Collection<DatanodeDescriptor> nodelist) {
+      List<DatanodeDescriptor> nonCommissionedNodeList =
+        new ArrayList<DatanodeDescriptor>();
+      for (Iterator<DatanodeDescriptor> it = nodelist.iterator(); 
+           it.hasNext(); ) {
+        DatanodeDescriptor node = it.next();
+        if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
+          nonCommissionedNodeList.add(node);
+        }
+      }
+      return nonCommissionedNodeList;
+    }
     /*
      * Return true if there are any blocks in neededReplication that 
      * reside on the specified node. Otherwise returns false.
@@ -2395,14 +2437,15 @@ class FSNamesystem implements FSConstants {
             // not be scheduled for removal on that node
             if (containingNodes != null && containingNodes.contains(srcNode)
                 && (excessBlocks == null || ! excessBlocks.contains(block))) {
-
               // filter out containingNodes that are marked for decommission.
-              int numCurrentReplica = countContainingNodes(containingNodes);
-
-              DatanodeDescriptor targets[] = chooseTargets(
+              List<DatanodeDescriptor> nodes = 
+                filterDecommissionedNodes(containingNodes);
+              int numCurrentReplica = nodes.size();
+              DatanodeDescriptor targets[] = replicator.chooseTarget(
                   Math.min( fileINode.getReplication() - numCurrentReplica,
-                            this.maxReplicationStreams - xmitsInProgress), 
-                  containingNodes, null, blockSize);
+                            this.maxReplicationStreams - xmitsInProgress),
+                  datanodeMap.get(srcNode.getStorageID()),
+                  nodes, null, blockSize);
               if (targets.length > 0) {
                 // Build items to return
                 replicateBlocks.add(block);
@@ -2471,110 +2514,470 @@ class FSNamesystem implements FSConstants {
       return results;
     }
   }
-
-    /**
-     * Get a certain number of targets, if possible.
-     * If not, return as many as we can.
-     * Only live nodes contained in {@link #heartbeats} are 
-     * targeted for replication.
-     * 
-     * @param desiredReplicates
-     *          number of duplicates wanted.
-     * @param forbiddenNodes
-     *          of DatanodeDescriptor instances that should not be considered targets.
-     * @return array of DatanodeDescriptor instances uses as targets.
-     */
-    DatanodeDescriptor[] chooseTargets(
-                            int desiredReplicates, 
-                            Collection<DatanodeDescriptor> forbiddenNodes,
-                            UTF8 clientMachine, 
-                            long blockSize) {
-        Collection<DatanodeDescriptor> targets = new ArrayList<DatanodeDescriptor>();
+  
+    /** The class is responsible for choosing the desired number of targets
+     * for placing block replicas.
+     * The replica placement strategy is that if the writer is on a datanode,
+     * the 1st replica is placed on the local machine, 
+     * otherwise a random datanode. The 2nd replica is placed on a datanode
+     * that is on a different rack. The 3rd replica is placed on a datanode
+     * which is on the same rack as the first replca.
+     * @author hairong
+     *
+     */
+    class Replicator {
+      private class NotEnoughReplicasException extends Exception {
+        NotEnoughReplicasException( String msg ) {
+          super( msg );
+        }
+      }
+      
+      /**
+       * choose <i>numOfReplicas</i> data nodes for <i>writer</i> to replicate
+       * a block with size <i>blocksize</i> 
+       * If not, return as many as we can.
+       * 
+       * @param numOfReplicas: number of replicas wanted.
+       * @param writer: the writer's machine, null if not in the cluster.
+       * @param excludedNodes: datanodesthat should not be considered targets.
+       * @param blocksize: size of the data to be written.
+       * @return array of DatanodeDescriptor instances chosen as targets
+       * and sorted as a pipeline.
+       */
+      DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+          DatanodeDescriptor writer,
+          List<DatanodeDescriptor> excludedNodes,
+          long blocksize ) {
+        if( excludedNodes == null) {
+          excludedNodes = new ArrayList<DatanodeDescriptor>();
+        }
         
-        if (desiredReplicates > heartbeats.size()) {
-          LOG.warn("Replication requested of "+desiredReplicates
-                      +" is larger than cluster size ("+heartbeats.size()
-                      +"). Using cluster size.");
-          desiredReplicates  = heartbeats.size();
-          if (desiredReplicates == 0) {
-            LOG.warn("While choosing target, totalMachines is " + desiredReplicates);
-          }
+        return chooseTarget(numOfReplicas, writer, 
+            new ArrayList<DatanodeDescriptor>(), excludedNodes, blocksize);
+      }
+      
+      /*
+       *  re-replicate <i>numOfReplicas</i>
+       /**
+        * choose <i>numOfReplicas</i> data nodes for <i>writer</i> 
+        * to re-replicate a block with size <i>blocksize</i> 
+        * If not, return as many as we can.
+        * 
+        * @param numOfReplicas: additional number of replicas wanted.
+        * @param writer: the writer's machine, null if not in the cluster.
+        * @param choosenNodes: datanodes that have been choosen as targets.
+        * @param excludedNodes: datanodesthat should not be considered targets.
+        * @param blocksize: size of the data to be written.
+        * @return array of DatanodeDescriptor instances chosen as target 
+        * and sorted as a pipeline.
+        */
+      DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+          DatanodeDescriptor writer,
+          List<DatanodeDescriptor> choosenNodes,
+          List<DatanodeDescriptor> excludedNodes,
+          long blocksize ) {
+        if( numOfReplicas == 0 )
+          return new DatanodeDescriptor[0];
+        
+        if( excludedNodes == null) {
+          excludedNodes = new ArrayList<DatanodeDescriptor>();
         }
         
-        double avgLoad = 0.0;
-        if (heartbeats.size() != 0) {
-          avgLoad = (double) totalLoad() / heartbeats.size();
-        }
-        // choose local replica first
-        if (desiredReplicates != 0) {
-          // make sure that the client machine is not forbidden
-          if (clientMachine != null && clientMachine.getLength() > 0) {
-            for (Iterator<DatanodeDescriptor> it = heartbeats.iterator();
-                 it.hasNext();) {
-              DatanodeDescriptor node = it.next();
-              if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) &&
-                  clientMachine.toString().equals(node.getHost()) &&
-                  !node.isDecommissionInProgress() && !node.isDecommissioned()) {
-                if ((node.getRemaining() >= blockSize * MIN_BLOCKS_FOR_WRITE) &&
-                    (node.getXceiverCount() <= (2.0 * avgLoad))) {
-                  targets.add(node);
-                  desiredReplicates--;
-                  break;
-                }
-              }
+        int clusterSize = clusterMap.getNumOfLeaves();
+        int totalNumOfReplicas = choosenNodes.size()+numOfReplicas;
+        if( totalNumOfReplicas > clusterSize) {
+          numOfReplicas -= (totalNumOfReplicas-clusterSize);
+          totalNumOfReplicas = clusterSize;
+        }
+        
+        int maxNodesPerRack = 
+          (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
+        
+        List<DatanodeDescriptor> results = 
+          new ArrayList<DatanodeDescriptor>(choosenNodes);
+        excludedNodes.addAll(choosenNodes);
+        
+        if(!clusterMap.contains(writer))
+          writer=null;
+        
+        DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, 
+            clusterMap.getLeaves(NodeBase.ROOT),
+            excludedNodes, blocksize, maxNodesPerRack, results );
+        
+        results.removeAll(choosenNodes);
+        
+        // sorting nodes to form a pipeline
+        return getPipeline((writer==null)?localNode:writer, results);
+      }
+      
+      /* choose <i>numOfReplicas</i> from <i>clusterNodes</i> */
+      private DatanodeDescriptor chooseTarget(int numOfReplicas,
+          DatanodeDescriptor writer,
+          DatanodeDescriptor[] clusterNodes,
+          List<DatanodeDescriptor> excludedNodes,
+          long blocksize,
+          int maxNodesPerRack,
+          List<DatanodeDescriptor> results) {
+        
+        if( numOfReplicas == 0 ) return writer;
+        
+        int numOfResults = results.size();
+        if(writer == null && (numOfResults==1 || numOfResults==2) ) {
+          writer = results.get(0);
+        }
+        
+        try {
+          switch( numOfResults ) {
+          case 0:
+            writer = chooseLocalNode(writer, clusterNodes, excludedNodes, 
+                blocksize, maxNodesPerRack, results);
+            if(--numOfReplicas == 0) break;
+          case 1:
+            chooseRemoteRack(1, writer, clusterNodes, excludedNodes, 
+                blocksize, maxNodesPerRack, results);
+            if(--numOfReplicas == 0) break;
+          case 2:
+            if(clusterMap.isOnSameRack(results.get(0), results.get(1))) {
+              chooseRemoteRack(1, writer, clusterNodes, excludedNodes,
+                  blocksize, maxNodesPerRack, results);
+            } else {
+              chooseLocalRack(writer, clusterNodes, excludedNodes, 
+                  blocksize, maxNodesPerRack, results);
             }
+            if(--numOfReplicas == 0) break;
+          default:
+            chooseRandom(numOfReplicas, clusterNodes, excludedNodes, 
+                blocksize, maxNodesPerRack, results);
           }
+        } catch (NotEnoughReplicasException e) {
+          LOG.warn("Not be able to place enough replicas, still in need of "
+              + numOfReplicas );
         }
-
-        for (int i = 0; i < desiredReplicates; i++) {
-          DatanodeDescriptor target = null;
-          //
-          // Otherwise, choose node according to target capacity
-          //
-          int nNodes = heartbeats.size();
-          int idx = r.nextInt(nNodes);
-          int rejected = 0;
-          while (target == null && rejected < nNodes ) {
-            DatanodeDescriptor node = heartbeats.get(idx);
-            if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) &&
-                !targets.contains(node) &&
-                !node.isDecommissionInProgress() && !node.isDecommissioned() &&
-                (node.getRemaining() >= blockSize * MIN_BLOCKS_FOR_WRITE) &&
-                (node.getXceiverCount() <= (2.0 * avgLoad))) {
-              target = node;
+        return writer;
+      }
+      
+      /* choose <i>localMachine</i> as the target.
+       * if <i>localMachine</i> is not availabe, 
+       * choose a node on the same rack
+       * @return the choosen node
+       */
+      private DatanodeDescriptor chooseLocalNode(
+          DatanodeDescriptor localMachine,
+          DatanodeDescriptor[] nodes,
+          List<DatanodeDescriptor> excludedNodes,
+          long blocksize,
+          int maxNodesPerRack,
+          List<DatanodeDescriptor> results)
+      throws NotEnoughReplicasException {
+        // if no local machine, randomly choose one node
+        if(localMachine == null)
+          return chooseRandom(nodes, excludedNodes, 
+              blocksize, maxNodesPerRack, results);
+        
+        // otherwise try local machine first
+        if(!excludedNodes.contains(localMachine)) {
+          excludedNodes.add(localMachine);
+          if( isGoodTarget(localMachine, blocksize, maxNodesPerRack, results)) {
+            results.add(localMachine);
+            return localMachine;
+          }
+        } 
+        
+        // try a node on local rack
+        return chooseLocalRack(localMachine, nodes, excludedNodes, 
+            blocksize, maxNodesPerRack, results);
+      }
+      
+      /* choose one node from the rack that <i>localMachine</i> is on.
+       * if no such node is availabe, choose one node from the rack where
+       * a second replica is on.
+       * if still no such node is available, choose a random node 
+       * in the cluster <i>nodes</i>.
+       * @return the choosen node
+       */
+      private DatanodeDescriptor chooseLocalRack(
+          DatanodeDescriptor localMachine,
+          DatanodeDescriptor[] nodes,
+          List<DatanodeDescriptor> excludedNodes,
+          long blocksize,
+          int maxNodesPerRack,
+          List<DatanodeDescriptor> results)
+      throws NotEnoughReplicasException {
+        // no local machine, so choose a random machine
+        if( localMachine == null ) {
+          return chooseRandom(nodes, excludedNodes, 
+              blocksize, maxNodesPerRack, results );
+        }
+        
+        // choose one from the local rack
+        try {
+          return chooseRandom(
+              clusterMap.getLeaves( localMachine.getNetworkLocation() ),
+              excludedNodes, blocksize, maxNodesPerRack, results);
+        } catch (NotEnoughReplicasException e1) {
+          // find the second replica
+          DatanodeDescriptor newLocal=null;
+          for(Iterator<DatanodeDescriptor> iter=results.iterator();
+          iter.hasNext();) {
+            DatanodeDescriptor nextNode = iter.next();
+            if(nextNode != localMachine) {
+              newLocal = nextNode;
               break;
-            } else {
-              idx = (idx+1) % nNodes;
-              rejected++;
             }
           }
-          if (target == null) {
-            idx = r.nextInt(nNodes);
-            rejected = 0;
-            while (target == null && rejected < nNodes ) {
-              DatanodeDescriptor node = heartbeats.get(idx);
-              if ((forbiddenNodes == null || !forbiddenNodes.contains(node)) &&
-                  !targets.contains(node) &&
-                  !node.isDecommissionInProgress() && !node.isDecommissioned() &&
-                  node.getRemaining() >= blockSize) {
-                target = node;
-                break;
-              } else {
-                idx = (idx + 1) % nNodes;
-                rejected++;
-              }
+          if( newLocal != null ) {
+            try {
+              return chooseRandom(
+                  clusterMap.getLeaves( newLocal.getNetworkLocation() ),
+                  excludedNodes, blocksize, maxNodesPerRack, results);
+            } catch( NotEnoughReplicasException e2 ) {
+              //otherwise randomly choose one from the network
+              return chooseRandom(nodes, excludedNodes,
+                  blocksize, maxNodesPerRack, results);
             }
+          } else {
+            //otherwise randomly choose one from the network
+            return chooseRandom(nodes, excludedNodes,
+                blocksize, maxNodesPerRack, results);
           }
-          
-          if (target == null) {
-            LOG.warn("Could not find any nodes with sufficient capacity");
-            break; // making one more pass over heartbeats would not help
+        }
+      }
+      
+      /* choose <i>numOfReplicas</i> nodes from the racks 
+       * that <i>localMachine</i> is NOT on.
+       * if not enough nodes are availabe, choose the remaining ones 
+       * from the local rack
+       */
+      
+      private void chooseRemoteRack( int numOfReplicas,
+          DatanodeDescriptor localMachine,
+          DatanodeDescriptor[] nodes,
+          List<DatanodeDescriptor> excludedNodes,
+          long blocksize,
+          int maxReplicasPerRack,
+          List<DatanodeDescriptor> results)
+      throws NotEnoughReplicasException {
+        // get all the nodes on the local rack
+        DatanodeDescriptor[] nodesOnRack = clusterMap.getLeaves(
+            localMachine.getNetworkLocation() );
+        
+        // can we speed up this??? using hashing sets?
+        DatanodeDescriptor[] nodesOnRemoteRack 
+        = new DatanodeDescriptor[nodes.length-nodesOnRack.length];
+        HashSet<DatanodeDescriptor> set1 = new HashSet<DatanodeDescriptor>(nodes.length);
+        HashSet<DatanodeDescriptor> set2 = new HashSet<DatanodeDescriptor>(nodesOnRack.length);
+        for(int i=0; i<nodes.length; i++) {
+          set1.add(nodes[i]);
+        }
+        for(int i=0; i<nodesOnRack.length; i++) {
+          set2.add(nodesOnRack[i]);
+        }
+        set1.removeAll(set2);
+        nodesOnRemoteRack = set1.toArray(nodesOnRemoteRack);
+        
+        int oldNumOfReplicas = results.size();
+        // randomly choose one node from remote racks
+        try {
+          chooseRandom( numOfReplicas, nodesOnRemoteRack, excludedNodes, 
+              blocksize, maxReplicasPerRack, results );
+        } catch (NotEnoughReplicasException e) {
+          chooseRandom( numOfReplicas-(results.size()-oldNumOfReplicas),
+              nodesOnRack, excludedNodes, blocksize, 
+              maxReplicasPerRack, results);
+        }
+      }
+      
+      /* Randomly choose one target from <i>nodes</i>.
+       * @return the choosen node
+       */
+      private DatanodeDescriptor chooseRandom(
+          DatanodeDescriptor[] nodes,
+          List<DatanodeDescriptor> excludedNodes,
+          long blocksize,
+          int maxNodesPerRack,
+          List<DatanodeDescriptor> results) 
+      throws NotEnoughReplicasException {
+        DatanodeDescriptor result;
+        do {
+          DatanodeDescriptor[] selectedNodes = 
+            chooseRandom(1, nodes, excludedNodes);
+          if(selectedNodes.length == 0 ) {
+            throw new NotEnoughReplicasException( 
+            "Not able to place enough replicas" );
+          }
+          result = (DatanodeDescriptor)(selectedNodes[0]);
+        } while( !isGoodTarget( result, blocksize, maxNodesPerRack, results));
+        results.add(result);
+        return result;
+      }
+      
+      /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
+       */
+      private void chooseRandom(int numOfReplicas,
+          DatanodeDescriptor[] nodes,
+          List<DatanodeDescriptor> excludedNodes,
+          long blocksize,
+          int maxNodesPerRack,
+          List<DatanodeDescriptor> results)
+      throws NotEnoughReplicasException {
+        boolean toContinue = true;
+        do {
+          DatanodeDescriptor[] selectedNodes = 
+            chooseRandom(numOfReplicas, nodes, excludedNodes);
+          if(selectedNodes.length < numOfReplicas) {
+            toContinue = false;
+          }
+          for(int i=0; i<selectedNodes.length; i++) {
+            DatanodeDescriptor result = (DatanodeDescriptor)(selectedNodes[i]);
+            if( isGoodTarget( result, blocksize, maxNodesPerRack, results)) {
+              numOfReplicas--;
+              results.add(result);
+            }
+          } // end of for
+        } while (numOfReplicas>0 && toContinue );
+        
+        if(numOfReplicas>0) {
+          throw new NotEnoughReplicasException( 
+          "Not able to place enough replicas");
+        }
+      }
+      
+      /* Randomly choose one node from <i>nodes</i>.
+       * @return the choosen node
+       */
+      private DatanodeDescriptor[] chooseRandom(int numOfReplicas, 
+          DatanodeDescriptor[] nodes,
+          List<DatanodeDescriptor> excludedNodes) {
+        List<DatanodeDescriptor> results = 
+          new ArrayList<DatanodeDescriptor>();
+        int numOfAvailableNodes = 0;
+        for(int i=0; i<nodes.length; i++) {
+          if( !excludedNodes.contains(nodes[i]) ) {
+            numOfAvailableNodes++;
+          }
+        }
+        numOfReplicas = (numOfAvailableNodes<numOfReplicas)?
+            numOfAvailableNodes:numOfReplicas;
+        while( numOfReplicas > 0 ) {
+          DatanodeDescriptor choosenNode = nodes[r.nextInt(nodes.length)];
+          if(!excludedNodes.contains(choosenNode) &&
+               !choosenNode.isDecommissionInProgress() &&
+               !choosenNode.isDecommissioned()) {
+            results.add( choosenNode );
+            excludedNodes.add(choosenNode);
+            numOfReplicas--;
           }
-          targets.add(target);
         }
+        return (DatanodeDescriptor[])results.toArray(
+            new DatanodeDescriptor[results.size()]);    
+      }
+      
+      /* judge if a node is a good target.
+       * return true if <i>node</i> has enough space, 
+       * does not have too much load, and the rack does not have too many nodes
+       */
+      private boolean isGoodTarget( DatanodeDescriptor node,
+          long blockSize, int maxTargetPerLoc,
+          List<DatanodeDescriptor> results) {
         
-        return (DatanodeDescriptor[]) targets.toArray(new DatanodeDescriptor[targets.size()]);
-    }
+        // check if the node is (being) decommissed
+        if(node.isDecommissionInProgress() || node.isDecommissioned()) {
+          return false;
+        }
+
+        // check the remaining capacity of the target machine
+        if(blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>node.getRemaining() ) {
+          return false;
+        }
+        
+        // check the communication traffic of the target machine
+        double avgLoad = 0;
+        int size = clusterMap.getNumOfLeaves();
+        if( size != 0 ) {
+          avgLoad = (double)totalLoad()/size;
+        }
+        if(node.getXceiverCount() > (2.0 * avgLoad)) {
+          return false;
+        }
+        
+        // check if the target rack has chosen too many nodes
+        String rackname = node.getNetworkLocation();
+        int counter=1;
+        for( Iterator<DatanodeDescriptor> iter = results.iterator();
+        iter.hasNext(); ) {
+          DatanodeDescriptor result = iter.next();
+          if(rackname.equals(result.getNetworkLocation())) {
+            counter++;
+          }
+        }
+        if(counter>maxTargetPerLoc) {
+          return false;
+        }
+        return true;
+      }
+      
+      /* Return a pipeline of nodes.
+       * The pipeline is formed finding a shortest path that 
+       * starts from the writer and tranverses all <i>nodes</i>
+       * This is basically a traveling salesman problem.
+       */
+      private DatanodeDescriptor[] getPipeline(
+          DatanodeDescriptor writer,
+          List<DatanodeDescriptor> nodes ) {
+        int numOfNodes = nodes.size();
+        DatanodeDescriptor[] results = new DatanodeDescriptor[numOfNodes];
+        if( numOfNodes==0 ) return results;
+        
+        synchronized( clusterMap ) {
+          int index=0;
+          if(writer == null || !clusterMap.contains(writer)) {
+            writer = nodes.get(0);
+          }
+          for( ;index<numOfNodes; index++ ) {
+            DatanodeDescriptor shortestNode = null;
+            int shortestDistance = Integer.MAX_VALUE;
+            int shortestIndex = index;
+            for( int i=index; i<numOfNodes; i++ ) {
+              DatanodeDescriptor currentNode = nodes.get(i);
+              int currentDistance = clusterMap.getDistance( writer, currentNode );
+              if(shortestDistance>currentDistance ) {
+                shortestDistance = currentDistance;
+                shortestNode = currentNode;
+                shortestIndex = i;
+              }
+            }
+            //switch position index & shortestIndex
+            if( index != shortestIndex ) {
+              nodes.set(shortestIndex, nodes.get(index));
+              nodes.set(index, shortestNode);
+            }
+            writer = shortestNode;
+          }
+        }
+        return nodes.toArray( results );
+      }
+      
+      /** Return datanodes that sorted by their distances to <i>reader</i>
+       */
+      DatanodeDescriptor[] sortByDistance( 
+          final DatanodeDescriptor reader,
+          List<DatanodeDescriptor> nodes ) {
+          synchronized(clusterMap) {
+              if(reader != null && clusterMap.contains(reader)) {
+                  java.util.Collections.sort(nodes, new Comparator<DatanodeDescriptor>() {
+                      public int compare(DatanodeDescriptor n1, DatanodeDescriptor n2) {
+                          return clusterMap.getDistance(reader, n1)
+                          -clusterMap.getDistance(reader, n2);
+                      }
+                  });
+              }
+          }
+          return (DatanodeDescriptor[])nodes.toArray(
+                  new DatanodeDescriptor[nodes.size()]);
+      }
+      
+    } //end of Replicator
+
 
     /**
      * Information about the file while it is being written to.
@@ -2664,6 +3067,18 @@ class FSNamesystem implements FSConstants {
       }
       return null;
     }
+    
+    /* Find data node by its host name. */
+    private DatanodeDescriptor getDatanodeByHost( String name ) {
+        for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); 
+        it.hasNext(); ) {
+            DatanodeDescriptor node = it.next();
+            if( node.getHost().equals(name) )
+                return node;
+        }
+        return null;
+    }
+    
     /** Stop at and return the datanode at index (used for content browsing)*/
     private DatanodeInfo getDatanodeByIndex( int index ) {
       int i = 0;
@@ -2802,6 +3217,9 @@ class FSNamesystem implements FSConstants {
             "STATE* SafeModeInfo.leave: " + "Safe mode is OFF." ); 
         reached = -1;
         safeMode = null;
+        NameNode.stateChangeLog.info("STATE* Network topology has "
+                +clusterMap.getNumOfRacks()+" racks and "
+                +clusterMap.getNumOfLeaves()+ " datanodes");
       }
       
       /** 

+ 3 - 2
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -539,10 +539,11 @@ public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
     ////////////////////////////////////////////////////////////////
     /** 
      */
-    public DatanodeRegistration register( DatanodeRegistration nodeReg
+    public DatanodeRegistration register( DatanodeRegistration nodeReg,
+                                          String networkLocation
                                         ) throws IOException {
       verifyVersion( nodeReg.getVersion() );
-      namesystem.registerDatanode( nodeReg );
+      namesystem.registerDatanode( nodeReg, networkLocation );
       return nodeReg;
     }
     

+ 425 - 0
src/java/org/apache/hadoop/net/NetworkTopology.java

@@ -0,0 +1,425 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.dfs.DatanodeDescriptor;
+
+/** The class represents a cluster of computer with a tree hierarchical
+ * network topology.
+ * For example, a cluster may be consists of many data centers filled 
+ * with racks of computers.
+ * In a network topology, leaves represent data nodes (computers) and inner
+ * nodes represent switches/routers that manage traffic in/out of data centers
+ * or racks.  
+ * 
+ * @author hairong
+ *
+ */
+public class NetworkTopology {
+    public final static String DEFAULT_RACK = "/default-rack";
+    public static final Log LOG = 
+        LogFactory.getLog("org.apache.hadoop.net.NetworkTopology");
+    
+    /* Inner Node represent a switch/router of a data center or rack.
+     * Different from a leave node, it has non-null children.
+     */
+    private class InnerNode extends NodeBase {
+        private HashMap<String, Node> children = 
+            new HashMap<String, Node>(); // maps a name to a node 
+        
+        /** Construct an InnerNode from a path-like string */
+        InnerNode( String path ) {
+            super( path );
+        }
+        
+        /** Construct an InnerNode from its name and its network location */
+        InnerNode( String name, String location ) {
+            super( name, location );
+        }
+        
+        /** Get its children */
+        HashMap<String, Node> getChildren() {return children;}
+        
+        /** Return the number of children this node has */
+        int getNumOfChildren() {
+            return children.size();
+        }
+        
+        /** Judge if this node represents a rack 
+         * Return true if it has no child or its children are not InnerNodes
+         */ 
+        boolean isRack() {
+            if(children.isEmpty()) {
+                return true;
+            }
+            
+            Node firstChild = children.values().iterator().next();
+            if(firstChild instanceof InnerNode) {
+                return false;
+            }
+            
+            return true;
+        }
+        
+        /** Judge if this node is an ancestor of node <i>n</i>
+         * 
+         * @param n: a node
+         * @return true if this node is an ancestor of <i>n</i>
+         */
+        boolean isAncestor(Node n) {
+            return n.getNetworkLocation().startsWith(getPath());
+        }
+        
+        /** Judge if this node is the parent of node <i>n</i>
+         * 
+         * @param n: a node
+         * @return true if this node is the parent of <i>n</i>
+         */
+        boolean isParent( Node n ) {
+            return n.getNetworkLocation().equals( getPath() );
+        }
+        
+        /* Return a child name of this node who is an ancestor of node <i>n</i> */
+        private String getNextAncestorName( Node n ) {
+            if( !isAncestor(n)) {
+                throw new IllegalArgumentException( 
+                        this + "is not an ancestor of " + n);
+            }
+            String name = n.getNetworkLocation().substring(getPath().length());
+            if(name.charAt(0) == PATH_SEPARATOR) {
+                name = name.substring(1);
+            }
+            int index=name.indexOf(PATH_SEPARATOR);
+            if( index !=-1 )
+                name = name.substring(0, index);
+            return name;
+        }
+        
+        /** Add node <i>n</i> to the subtree of this node 
+         * @param n node to be added
+         * @return true if the node is added; false otherwise
+         */
+        boolean add( Node n ) {
+            String parent = n.getNetworkLocation();
+            String currentPath = getPath();
+            if( !isAncestor( n ) )
+                throw new IllegalArgumentException( n.getName()+", which is located at "
+                        +parent+", is not a decendent of "+currentPath);
+            if( isParent( n ) ) {
+                // this node is the parent of n; add n directly
+                return (null == children.put(n.getName(), n) );
+            } else {
+                // find the next ancestor node
+                String parentName = getNextAncestorName( n );
+                InnerNode parentNode = (InnerNode)children.get(parentName);
+                if( parentNode == null ) {
+                    // create a new InnerNode
+                    parentNode = new InnerNode( parentName, currentPath );
+                    children.put(parentName, parentNode);
+                }
+                // add n to the subtree of the next ancestor node
+                return parentNode.add(n);
+            }
+        }
+        
+        /** Remove node <i>n</i> from the subtree of this node
+         * @parameter n node to be deleted 
+         * @return true if the node is deleted; false otherwise
+         */
+        boolean remove( Node n ) {
+            String parent = n.getNetworkLocation();
+            String currentPath = getPath();
+            if(!isAncestor(n))
+                throw new IllegalArgumentException( n.getName()+", which is located at "
+                        +parent+", is not a decendent of "+currentPath);
+            if( isParent(n) ) {
+                // this node is the parent of n; remove n directly
+                return (n == children.remove(n.getName()));
+            } else {
+                // find the next ancestor node: the parent node
+                String parentName = getNextAncestorName( n );
+                InnerNode parentNode = (InnerNode)children.get(parentName);
+                if(parentNode==null) {
+                    throw new IllegalArgumentException( n.getName()
+                            + ", which is located at "
+                            + parent+", is not a decendent of " + currentPath);
+                }
+                // remove n from the parent node
+                boolean isRemoved = parentNode.remove( n );
+                // if the parent node has no children, remove the parent node too
+                if(parentNode.getNumOfChildren() == 0 ) {
+                    children.remove(parentName);
+                }
+                return isRemoved;
+            }
+        } // end of remove
+        
+        /** Given a node's string representation, return a reference to the node */ 
+        Node getLoc( String loc ) {
+            if( loc == null || loc.length() == 0 ) return this;
+            String[] path = loc.split(PATH_SEPARATOR_STR, 2);
+            Node childnode = children.get( path[0] );
+            if(childnode == null ) return null; // non-existing node
+            if( path.length == 1 ) return childnode;
+            if( childnode instanceof InnerNode ) {
+                return ((InnerNode)childnode).getLoc(path[1]);
+            } else {
+                return null;
+            }
+        }
+        
+        /** Get all the data nodes belonged to the subtree of this node */
+        void getLeaves( Collection<DatanodeDescriptor> results ) {
+            for( Iterator<Node> iter = children.values().iterator();
+            iter.hasNext(); ) {
+                Node childNode = iter.next();
+                if( childNode instanceof InnerNode ) {
+                    ((InnerNode)childNode).getLeaves(results);
+                } else {
+                    results.add( (DatanodeDescriptor)childNode );
+                }
+            }
+        }
+    } // end of InnerNode
+    
+    InnerNode clusterMap = new InnerNode( InnerNode.ROOT ); //the root of the tree
+    private int numOfLeaves = 0; // data nodes counter
+    private int numOfRacks = 0;  // rack counter
+    
+    public NetworkTopology() {
+    }
+    
+    /** Add a data node
+     * Update data node counter & rack counter if neccessary
+     * @param node
+     *          data node to be added
+     * @exception IllegalArgumentException if add a data node to an existing leave
+     */
+    public synchronized void add( DatanodeDescriptor node ) {
+        if( node==null ) return;
+        LOG.info("Adding a new node: "+node.getPath());
+        Node rack = getNode(node.getNetworkLocation());
+        if(rack != null && !(rack instanceof InnerNode) ) {
+            throw new IllegalArgumentException( "Unexpected data node " 
+                    + node.toString() 
+                    + " at an illegal network location");
+        }
+        if( clusterMap.add( node) ) {
+            numOfLeaves++;
+            if( rack == null ) {
+                numOfRacks++;
+            }
+        }
+        LOG.debug("NetworkTopology became:\n" + this.toString());
+    }
+    
+    /** Remove a data node
+     * Update data node counter & rack counter if neccessary
+     * @param node
+     *          data node to be removed
+     */ 
+    public synchronized void remove( DatanodeDescriptor node ) {
+        if( node==null ) return;
+        LOG.info("Removing a node: "+node.getPath());
+        if( clusterMap.remove( node ) ) {
+            numOfLeaves--;
+            InnerNode rack = (InnerNode)getNode(node.getNetworkLocation());
+            if(rack == null) {
+                numOfRacks--;
+            }
+        }
+        LOG.debug("NetworkTopology became:\n" + this.toString());
+    }
+       
+    /** Check if the tree contains data node <i>node</i>
+     * 
+     * @param node
+     *          a data node
+     * @return true if <i>node</i> is already in the tree; false otherwise
+     */
+    public boolean contains( DatanodeDescriptor node ) {
+        if( node == null ) return false;
+        Node rNode = getNode(node.getPath());
+        return (rNode == node); 
+    }
+    
+    /** Given a string representation of a node, return the reference to the node
+     * 
+     * @param loc
+     *          a path-like string representation of a node
+     * @return a reference to the node; null if the node is not in the tree
+     */
+    public synchronized Node getNode( String loc ) {
+        loc = NodeBase.normalize(loc);
+        if(!NodeBase.ROOT.equals(loc))
+            loc = loc.substring(1);
+        return clusterMap.getLoc( loc );
+    }
+    
+    /* Add all the data nodes that belong to 
+     * the subtree of the node <i>loc</i> to <i>results</i>*/
+    private synchronized void getLeaves( String loc,
+            Collection<DatanodeDescriptor> results ) {
+        Node node = getNode(loc);
+        if( node instanceof InnerNode )
+            ((InnerNode)node).getLeaves(results);
+        else {
+            results.add((DatanodeDescriptor)node);
+        }
+    }
+    
+    /** Return all the data nodes that belong to the subtree of <i>loc</i>
+     * @param loc
+     *          a path-like string representation of a node
+     * @return an array of data nodes that belong to the subtree of <i>loc</i>
+     */
+    public synchronized DatanodeDescriptor[] getLeaves( String loc ) {
+        Collection<DatanodeDescriptor> results = new ArrayList<DatanodeDescriptor>();
+        getLeaves(loc, results);
+        return results.toArray(new DatanodeDescriptor[results.size()]);
+    }
+    
+    /** Return all the data nodes that belong to the subtrees of <i>locs</i>
+     * @param locs
+     *          a collection of strings representing nodes
+     * @return an array of data nodes that belong to subtrees of <i>locs</i>
+     */
+    public synchronized DatanodeDescriptor[] getLeaves(
+            Collection<String> locs ) {
+        Collection<DatanodeDescriptor> nodes = new ArrayList<DatanodeDescriptor>();
+        if( locs != null ) { 
+            Iterator<String> iter = locs.iterator();
+            while(iter.hasNext()) {
+                getLeaves( iter.next(), nodes );
+            }
+        }
+        return nodes.toArray(new DatanodeDescriptor[nodes.size()]);
+    }
+    
+    /** Return the total number of racks */
+    public int getNumOfRacks( ) {
+        return numOfRacks;
+    }
+    
+    /** Return the total number of data nodes */
+    public int getNumOfLeaves() {
+        return numOfLeaves;
+    }
+    
+    private void checkArgument( DatanodeDescriptor node ) {
+        if( node == null ) {
+            throw new IllegalArgumentException( 
+                    "Unexpected null pointer argument" );
+        }
+        if( !contains(node) ) {
+            String path = node.getPath();
+            LOG.warn("The cluster does not contain data node: " + path);
+            throw new IllegalArgumentException(
+                    "Unexpected non-existing data node: " +path);
+        }
+    }
+    
+    /** Return the distance between two data nodes
+     * It is assumed that the distance from one node to its parent is 1
+     * The distance between two nodes is calculated by summing up their distances
+     * to their closest common  ancestor.
+     * @param node1 one data node
+     * @param node2 another data node
+     * @return the distance between node1 and node2
+     * @exception IllegalArgumentException when either node1 or node2 is null, or
+     * node1 or node2 do not belong to the cluster
+     */
+    public int getDistance(DatanodeDescriptor node1, DatanodeDescriptor node2 ) {
+        checkArgument( node1 );
+        checkArgument( node2 );
+        /*
+        if( !contains(node1) || !contains(node2) ) {
+            return Integer.MAX_VALUE;
+        }
+        */
+        if( node1 == node2 || node1.equals(node2)) {
+            return 0;
+        }
+        String[] path1 = node1.getNetworkLocation().split("/");
+        String[] path2 = node2.getNetworkLocation().split("/");
+        
+        int i;
+        for(i=0; i<Math.min(path1.length, path2.length); i++) {
+            if( path1[i]!=path2[i] && (path1[i]!=null 
+                    && !path1[i].equals(path2[i]))) {
+                break;
+            }
+        }
+        return 2+(path1.length-i)+(path2.length-i);
+    } 
+    
+    /** Check if two data nodes are on the same rack
+     * @param node1 one data node
+     * @param node2 another data node
+     * @return true if node1 and node2 are pm the same rack; false otherwise
+     * @exception IllegalArgumentException when either node1 or node2 is null, or
+     * node1 or node2 do not belong to the cluster
+     */
+    public boolean isOnSameRack(
+            DatanodeDescriptor node1, DatanodeDescriptor node2) {
+        checkArgument( node1 );
+        checkArgument( node2 );
+        if( node1 == node2 || node1.equals(node2)) {
+            return true;
+        }
+        
+        String location1 = node1.getNetworkLocation();
+        String location2 = node2.getNetworkLocation();
+        
+        if(location1 == location2 ) return true;
+        if(location1 == null || location2 == null) return false;
+        return location1.equals(location2);
+    }
+    
+    /** convert a network tree to a string */
+    public String toString() {
+        // print the number of racks
+        StringBuffer tree = new StringBuffer();
+        tree.append( "Number of racks: " );
+        tree.append( numOfRacks );
+        tree.append( "\n" );
+        // print the number of leaves
+        tree.append( "Expected number of leaves:" );
+        tree.append( numOfLeaves );
+        tree.append( "\n" );
+        // get all datanodes
+        DatanodeDescriptor[] datanodes = getLeaves( NodeBase.ROOT );
+        // print the number of leaves
+        tree.append( "Actual number of leaves:" );
+        tree.append( datanodes.length );
+        tree.append( "\n" );
+        // print datanodes
+        for( int i=0; i<datanodes.length; i++ ) {
+            tree.append( datanodes[i].getPath() );
+            tree.append( "\n");
+        }
+        return tree.toString();
+    }
+}

+ 37 - 0
src/java/org/apache/hadoop/net/Node.java

@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net;
+
+/** The interface defines a node in a network topology.
+ * A node may be a leave representing a data node or an inner
+ * node representing a datacenter or rack.
+ * Each data has a name and its location in the network is
+ * decided by a string with syntax similar to a file name. 
+ * For example, a data node's name is hostname:port# and if it's located at
+ * rack "orange" in datacenter "dog", the string representation of its
+ * network location is /dog/orange
+ * @author hairong
+ *
+ */
+
+public interface Node {
+  /** Return the string representation of this node's network location */
+  public String getNetworkLocation();
+  /** Return this node's name */
+  public String getName();
+}

+ 101 - 0
src/java/org/apache/hadoop/net/NodeBase.java

@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net;
+
+/** A base class that implements interface Node
+ * 
+ * @author hairong
+ *
+ */
+
+public class NodeBase implements Node {
+  public final static char PATH_SEPARATOR = '/';
+  public static String PATH_SEPARATOR_STR = "/";
+  public final static String ROOT = ""; // string representation of root
+  
+  protected String name; //host:port#
+  protected String location; //string representation of this node's location
+  
+  /** Default constructor */
+  public NodeBase( ) {
+  }
+  
+  /** Construct a node from its path
+   * @param path 
+   *   a concatenation of this node's location, the path seperator, and its name 
+   */
+  public NodeBase( String path ) {
+    path = normalize(path);
+    int index = path.lastIndexOf( PATH_SEPARATOR );
+    if( index== -1 ) {
+      set( ROOT, path );
+    } else {
+      set( path.substring(index+1), path.substring(0, index) );
+    }
+  }
+  
+  /** Construct a node from its name and its location
+   * @param name this node's name 
+   * @param location this node's location 
+   */
+  public NodeBase( String name, String location ) {
+    set(name, normalize(location));
+  }
+  
+  /* set this node's name and location */
+  private void set( String name, String location ) {
+    if(name != null && name.contains(PATH_SEPARATOR_STR))
+      throw new IllegalArgumentException(
+          "Network location name contains /: "+name);
+    this.name = (name==null)?"":name;
+    this.location = location;      
+  }
+  
+  /** Return this node's name */
+  public String getName() { return name; }
+  
+  /** Return this node's network location */
+  public String getNetworkLocation() { return location; }
+  
+  /** Return this node's path */
+  public String getPath() {
+    return location+PATH_SEPARATOR_STR+name;
+  }
+  
+  /** Return this node's string representation */
+  public String toString() {
+    return getPath();
+  }
+
+  /** Normalize a path */
+  static public String normalize(String path) {
+    if( path == null || path.length() == 0 ) return ROOT;
+    
+    if( path.charAt(0) != PATH_SEPARATOR ) {
+      throw new IllegalArgumentException( 
+          "Network Location path does not start with "
+          +PATH_SEPARATOR_STR+ ": "+path);
+    }
+    
+    int len = path.length();
+    if(path.charAt(len-1) == PATH_SEPARATOR) {
+      return path.substring(0, len-1);
+    }
+    return path;
+  }
+}

+ 36 - 6
src/test/org/apache/hadoop/dfs/MiniDFSCluster.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.dfs;
 import java.io.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.net.NetworkTopology;
 
 /**
  * This class creates a single-process DFS cluster for junit testing.
@@ -95,9 +96,14 @@ public class MiniDFSCluster {
       this.conf.set("dfs.data.dir",
           new File(dataDir, "data"+(2*index+1)).getPath()+","+
           new File(dataDir, "data"+(2*index+2)).getPath());
-    
     }
     
+    public DataNodeRunner(Configuration conf, File dataDir, 
+        String networkLoc, int index) {
+        this(conf, dataDir, index);
+        this.conf.set("dfs.datanode.rack", networkLoc);
+    }
+        
     /**
      * Create and run the data node.
      */
@@ -115,7 +121,8 @@ public class MiniDFSCluster {
             }
           }
         }
-        node = new DataNode(conf, dirs);
+        node = new DataNode(conf, conf.get("dfs.datanode.rack", 
+            NetworkTopology.DEFAULT_RACK), dirs);
         node.run();
       } catch (Throwable e) {
         node = null;
@@ -144,7 +151,7 @@ public class MiniDFSCluster {
   public MiniDFSCluster(int namenodePort, 
                         Configuration conf,
                         boolean dataNodeFirst) throws IOException {
-    this(namenodePort, conf, 1, dataNodeFirst, true);
+    this(namenodePort, conf, 1, dataNodeFirst, true, null);
   }
   
   /**
@@ -159,7 +166,7 @@ public class MiniDFSCluster {
                         Configuration conf,
                         int nDatanodes,
                         boolean dataNodeFirst) throws IOException {
-    this(namenodePort, conf, nDatanodes, dataNodeFirst, true);
+    this(namenodePort, conf, nDatanodes, dataNodeFirst, true, null);
   }
   
   /**
@@ -175,7 +182,26 @@ public class MiniDFSCluster {
                         Configuration conf,
                         int nDatanodes,
                         boolean dataNodeFirst,
-                        boolean formatNamenode) throws IOException {
+                        boolean formatNamenode ) throws IOException {
+    this(namenodePort, conf, nDatanodes, dataNodeFirst, formatNamenode, null);
+  }
+  
+  /**
+   * Create the config and start up the servers.  If either the rpc or info port is already 
+   * in use, we will try new ports.
+   * @param namenodePort suggestion for which rpc port to use.  caller should use 
+   *                     getNameNodePort() to get the actual port used.
+   * @param nDatanodes Number of datanodes   
+   * @param dataNodeFirst should the datanode be brought up before the namenode?
+   * @param formatNamenode should the namenode be formatted before starting up ?
+   * @param racks array of strings indicating racks that each datanode is on
+   */
+  public MiniDFSCluster(int namenodePort, 
+                        Configuration conf,
+                        int nDatanodes,
+                        boolean dataNodeFirst,
+                        boolean formatNamenode,
+                        String[] racks) throws IOException {
 
     this.conf = conf;
 
@@ -208,7 +234,11 @@ public class MiniDFSCluster {
       dataNodes = new DataNodeRunner[nDatanodes];
       dataNodeThreads = new Thread[nDatanodes];
       for (int idx = 0; idx < nDatanodes; idx++) {
-        dataNodes[idx] = new DataNodeRunner(conf, data_dir, idx);
+        if( racks == null || idx >= racks.length) {
+          dataNodes[idx] = new DataNodeRunner(conf, data_dir, idx);
+        } else {
+          dataNodes[idx] = new DataNodeRunner(conf, data_dir, racks[idx], idx);          
+        }
         dataNodeThreads[idx] = new Thread(dataNodes[idx]);
       }
       if (dataNodeFirst) {

+ 65 - 16
src/test/org/apache/hadoop/dfs/TestReplication.java

@@ -21,21 +21,32 @@ import junit.framework.TestCase;
 import java.io.*;
 import java.util.Random;
 import java.net.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FSOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.DNS;
 
 /**
  * This class tests the replication of a DFS file.
- * @author Milind Bhandarkar
+ * @author Milind Bhandarkar, Hairong Kuang
  */
 public class TestReplication extends TestCase {
-  static final long seed = 0xDEADBEEFL;
-  static final int blockSize = 8192;
-  static final int fileSize = 16384;
-  static final int numDatanodes = 4;
+  private static final long seed = 0xDEADBEEFL;
+  private static final int blockSize = 8192;
+  private static final int fileSize = 16384;
+  private static final String racks[] = new String[] {
+    "/d1/r1", "/d1/r1", "/d1/r2", "/d1/r2", "/d1/r2", "/d2/r3", "/d2/r3"
+  };
+  private static final int numDatanodes = racks.length;
+  private static final Log LOG = LogFactory.getLog(
+          "org.apache.hadoop.dfs.TestReplication");
+
+
 
   private void writeFile(FileSystem fileSys, Path name, int repl)
   throws IOException {
@@ -49,14 +60,47 @@ public class TestReplication extends TestCase {
     stm.close();
   }
   
-  
+  /* check if there are at least two nodes are on the same rack */
   private void checkFile(FileSystem fileSys, Path name, int repl)
   throws IOException {
-    String[][] locations = fileSys.getFileCacheHints(name, 0, fileSize);
-    for (int idx = 0; idx < locations.length; idx++) {
-      assertEquals("Number of replicas for block" + idx,
-          Math.min(numDatanodes, repl), locations[idx].length);  
-    }
+      Configuration conf = fileSys.getConf();
+      ClientProtocol namenode = (ClientProtocol) RPC.getProxy(
+              ClientProtocol.class,
+              ClientProtocol.versionID,
+              DataNode.createSocketAddr(conf.get("fs.default.name")), 
+              conf);
+           
+      LocatedBlock[] locations = namenode.open(
+              DNS.getDefaultHost("default"), name.toString());
+      boolean isOnSameRack = true, isNotOnSameRack = true;
+      for (int idx = 0; idx < locations.length; idx++) {
+          DatanodeInfo[] datanodes = locations[idx].getLocations();
+          assertEquals("Number of replicas for block" + idx,
+                  Math.min(numDatanodes, repl), datanodes.length);  
+          if(datanodes.length <= 1) break;
+          if(datanodes.length == 2) {
+              isNotOnSameRack = !( datanodes[0].getNetworkLocation().equals(
+                      datanodes[1].getNetworkLocation() ) );
+              break;
+          }
+          isOnSameRack = false;
+          isNotOnSameRack = false;
+          for (int idy = 0; idy < datanodes.length-1; idy++) {
+                  LOG.info("datanode "+ idy + ": "+ datanodes[idy].getName());
+                  boolean onRack = datanodes[idy].getNetworkLocation().equals(
+                          datanodes[idy+1].getNetworkLocation() );
+                  if( onRack ) {
+                      isOnSameRack = true;
+                  }
+                  if( !onRack ) {
+                      isNotOnSameRack = true;                      
+                  }
+                  if( isOnSameRack && isNotOnSameRack ) break;
+          }
+          if( !isOnSameRack || !isNotOnSameRack ) break;
+      }
+      assertTrue(isOnSameRack);
+      assertTrue(isNotOnSameRack);
   }
   
   private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
@@ -70,21 +114,23 @@ public class TestReplication extends TestCase {
    */
   public void testReplication() throws IOException {
     Configuration conf = new Configuration();
-    MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, numDatanodes, false);
+    MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, numDatanodes, false, true, racks);
     // Now wait for 15 seconds to give datanodes chance to register
     // themselves and to report heartbeat
     try {
-      Thread.sleep(15000L);
+         Thread.sleep(15000L);
     } catch (InterruptedException e) {
-      // nothing
+         // nothing
     }
+    
     InetSocketAddress addr = new InetSocketAddress("localhost", 65312);
     DFSClient client = new DFSClient(addr, conf);
+    
     DatanodeInfo[] info = client.datanodeReport();
     assertEquals("Number of Datanodes ", numDatanodes, info.length);
     FileSystem fileSys = cluster.getFileSystem();
     try {
-      Path file1 = new Path("smallblocktest.dat");
+      Path file1 = new Path("/smallblocktest.dat");
       writeFile(fileSys, file1, 3);
       checkFile(fileSys, file1, 3);
       cleanupFile(fileSys, file1);
@@ -97,6 +143,9 @@ public class TestReplication extends TestCase {
       writeFile(fileSys, file1, 1);
       checkFile(fileSys, file1, 1);
       cleanupFile(fileSys, file1);
+      writeFile(fileSys, file1, 2);
+      checkFile(fileSys, file1, 2);
+      cleanupFile(fileSys, file1);
     } finally {
       fileSys.close();
       cluster.shutdown();

+ 374 - 0
src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java

@@ -0,0 +1,374 @@
+package org.apache.hadoop.dfs;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetworkTopology;
+
+import junit.framework.TestCase;
+
+public class TestReplicationPolicy extends TestCase {
+  private static final int BLOCK_SIZE = 1024;
+  private static final int NUM_OF_DATANODES = 6;
+  private static final Configuration CONF = new Configuration();
+  private static final NetworkTopology cluster;
+  private static NameNode namenode;
+  private static FSNamesystem.Replicator replicator;
+  private static DatanodeDescriptor dataNodes[] = 
+         new DatanodeDescriptor[] {
+    new DatanodeDescriptor(new DatanodeID("h1:5020", "0", -1), "/d1/r1"),
+    new DatanodeDescriptor(new DatanodeID("h2:5020", "0", -1), "/d1/r1"),
+    new DatanodeDescriptor(new DatanodeID("h3:5020", "0", -1), "/d1/r2"),
+    new DatanodeDescriptor(new DatanodeID("h4:5020", "0", -1), "/d1/r2"),
+    new DatanodeDescriptor(new DatanodeID("h5:5020", "0", -1), "/d2/r3"),
+    new DatanodeDescriptor(new DatanodeID("h6:5020", "0", -1), "/d2/r3")
+ };
+   
+private final static DatanodeDescriptor NODE = 
+  new DatanodeDescriptor(new DatanodeID("h7:5020", "0", -1), "/d2/r4");
+  
+  static {
+    try {
+      CONF.set("fs.default.name", "localhost:8020");
+      namenode = new NameNode(CONF);
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+    FSNamesystem fsNamesystem = FSNamesystem.getFSNamesystem();
+    replicator = fsNamesystem.replicator;
+    cluster = fsNamesystem.clusterMap;
+    // construct network topology
+    for( int i=0; i<NUM_OF_DATANODES; i++) {
+      cluster.add( dataNodes[i] );
+    }
+    for( int i=0; i<NUM_OF_DATANODES; i++) {
+      dataNodes[i].updateHeartbeat(
+          2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 
+          2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0);
+    }
+  }
+  
+  /**
+   * In this testcase, client is dataNodes[0]. So the 1st replica should be
+   * placed on dataNodes[0], the 2nd replica should be placed on dataNodes[1],
+   * and the rest should be placed on different racks.
+   * The only excpetion is when the <i>numOfReplicas</i> is 2, the 1st is on
+   * dataNodes[0] and the 2nd is on a different rack.
+   * @throws Exception
+   */
+  public void testChooseTarget1() throws Exception {
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(
+        0, dataNodes[0], null, BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+    
+    targets = replicator.chooseTarget(
+        1, dataNodes[0], null, BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertEquals(targets[0], dataNodes[0]);
+    
+    targets = replicator.chooseTarget(
+        2, dataNodes[0], null, BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertEquals(targets[0], dataNodes[0]);
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    
+    targets = replicator.chooseTarget(
+        3, dataNodes[0], null, BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertEquals(targets[0], dataNodes[0]);
+    assertTrue(cluster.isOnSameRack(targets[0], targets[1]));
+    assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+    
+    targets = replicator.chooseTarget(
+        4, dataNodes[0], null, BLOCK_SIZE);
+    assertEquals(targets.length, 4);
+    assertEquals(targets[0], dataNodes[0]);
+    assertTrue(cluster.isOnSameRack(targets[0], targets[1]));
+    assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+    assertFalse(cluster.isOnSameRack(targets[0], targets[3]));
+  }
+
+  /**
+   * In this testcase, client is dataNodes[0], but the dataNodes[1] is
+   * not allowed to be choosen. So the 1st replica should be
+   * placed on dataNodes[0], the 2nd replica should be placed on a different
+   * rack, the 3rd should the same rack as the 3nd replic, and the rest
+   * should be placed on a third rack.
+   * @throws Exception
+   */
+  public void testChooseTarget2() throws Exception { 
+    List<DatanodeDescriptor> excludedNodes;
+    DatanodeDescriptor[] targets;
+    
+    excludedNodes = new ArrayList<DatanodeDescriptor>();
+    excludedNodes.add(dataNodes[1]); 
+    targets = replicator.chooseTarget(
+        0, dataNodes[0], excludedNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+    
+    excludedNodes = new ArrayList<DatanodeDescriptor>();
+    excludedNodes.add(dataNodes[1]); 
+    targets = replicator.chooseTarget(
+        1, dataNodes[0], excludedNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertEquals(targets[0], dataNodes[0]);
+    
+    excludedNodes = new ArrayList<DatanodeDescriptor>();
+    excludedNodes.add(dataNodes[1]); 
+    targets = replicator.chooseTarget(
+        2, dataNodes[0], excludedNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertEquals(targets[0], dataNodes[0]);
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    
+    excludedNodes = new ArrayList<DatanodeDescriptor>();
+    excludedNodes.add(dataNodes[1]); 
+    targets = replicator.chooseTarget(
+        3, dataNodes[0], excludedNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertEquals(targets[0], dataNodes[0]);
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+    
+    excludedNodes = new ArrayList<DatanodeDescriptor>();
+    excludedNodes.add(dataNodes[1]); 
+    targets = replicator.chooseTarget(
+        4, dataNodes[0], excludedNodes, BLOCK_SIZE);
+    assertEquals(targets.length, 4);
+    assertEquals(targets[0], dataNodes[0]);
+    for(int i=1; i<4; i++) {
+      assertFalse(cluster.isOnSameRack(targets[0], targets[i]));
+    }
+    assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
+        cluster.isOnSameRack(targets[2], targets[3]));
+    assertFalse(cluster.isOnSameRack(targets[1], targets[3]));
+  }
+
+  /**
+   * In this testcase, client is dataNodes[0], but dataNodes[0] is not qualified
+   * to be choosen. So the 1st replica should be placed on dataNodes[1], 
+   * the 2nd replica should be placed on a different rack,
+   * the 3rd replica should be placed on the same rack as the 2nd replica,
+   * and the rest should be placed on the third rack.
+   * @throws Exception
+   */
+  public void testChooseTarget3() throws Exception {
+    // make data node 0 to be not qualified to choose
+    dataNodes[0].updateHeartbeat(
+        2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 
+        FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 4); // overloaded
+      
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(
+        0, dataNodes[0], null, BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+    
+    targets = replicator.chooseTarget(
+        1, dataNodes[0], null, BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertEquals(targets[0], dataNodes[1]);
+    
+    targets = replicator.chooseTarget(
+        2, dataNodes[0], null, BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertEquals(targets[0], dataNodes[1]);
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    
+    targets = replicator.chooseTarget(
+        3, dataNodes[0], null, BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertEquals(targets[0], dataNodes[1]);
+    assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    
+    targets = replicator.chooseTarget(
+        4, dataNodes[0], null, BLOCK_SIZE);
+    assertEquals(targets.length, 4);
+    assertEquals(targets[0], dataNodes[1]);
+    for(int i=1; i<4; i++) {
+      assertFalse(cluster.isOnSameRack(targets[0], targets[i]));
+    }
+    assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
+        cluster.isOnSameRack(targets[2], targets[3]));
+    assertFalse(cluster.isOnSameRack(targets[1], targets[3]));
+
+    dataNodes[0].updateHeartbeat(
+        2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 
+        FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0); 
+  }
+  
+  /**
+   * In this testcase, client is dataNodes[0], but none of the nodes on rack 1
+   * is qualified to be choosen. So the 1st replica should be placed on either
+   * rack 2 or rack 3. 
+   * the 2nd replica should be placed on a different rack,
+   * the 3rd replica should be placed on the same rack as the 1st replica,
+   * @throws Exception
+   */
+  public void testChoooseTarget4() throws Exception {
+    // make data node 0 & 1 to be not qualified to choose: not enough disk space
+    for(int i=0; i<2; i++) {
+      dataNodes[i].updateHeartbeat(
+          2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 
+          (FSConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0);
+    }
+      
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(
+        0, dataNodes[0], null, BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+    
+    targets = replicator.chooseTarget(
+        1, dataNodes[0], null, BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+    
+    targets = replicator.chooseTarget(
+        2, dataNodes[0], null, BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    
+    targets = replicator.chooseTarget(
+        3, dataNodes[0], null, BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    for(int i=0; i<3; i++) {
+      assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0]));
+    }
+    assertTrue(cluster.isOnSameRack(targets[0], targets[1]) ||
+        cluster.isOnSameRack(targets[1], targets[2]));
+    assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
+    
+    for(int i=0; i<2; i++) {
+      dataNodes[i].updateHeartbeat(
+          2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 
+          FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0);
+    }
+  }
+  /**
+   * In this testcase, client is is a node outside of file system.
+   * So the 1st replica can be placed on any node. 
+   * the 2nd replica should be placed on a different rack,
+   * the 3rd replica should be placed on the same rack as the 1st replica,
+   * @throws Exception
+   */
+  public void testChooseTarget5() throws Exception {
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(
+        0, NODE, null, BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+    
+    targets = replicator.chooseTarget(
+        1, NODE, null, BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    
+    targets = replicator.chooseTarget(
+        2, NODE, null, BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
+    
+    targets = replicator.chooseTarget(
+        3, NODE, null, BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertTrue(cluster.isOnSameRack(targets[0], targets[1]));
+    assertFalse(cluster.isOnSameRack(targets[0], targets[2]));    
+  }
+  
+  /**
+   * This testcase tests re-replication, when dataNodes[0] is already choosen.
+   * So the 1st replica can be placed on rack 1. 
+   * the 2nd replica should be placed on a different rack,
+   * the 3rd replica can be placed randomly,
+   * @throws Exception
+   */
+  public void testRereplicate1() throws Exception {
+    List<DatanodeDescriptor> choosenNodes = new ArrayList<DatanodeDescriptor>();
+    choosenNodes.add(dataNodes[0]);    
+    DatanodeDescriptor[] targets;
+    
+    targets = replicator.chooseTarget(
+        0, dataNodes[0], choosenNodes, null, BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+    
+    targets = replicator.chooseTarget(
+        1, dataNodes[0], choosenNodes, null, BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    
+    targets = replicator.chooseTarget(
+        2, dataNodes[0], choosenNodes, null, BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[1]));
+    
+    targets = replicator.chooseTarget(
+        3, dataNodes[0], choosenNodes, null, BLOCK_SIZE);
+    assertEquals(targets.length, 3);
+    assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[1]));
+    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[2]));    
+  }
+
+  /**
+   * This testcase tests re-replication, 
+   * when dataNodes[0] and dataNodes[1] are already choosen.
+   * So the 1st replica should be placed on a different rack than rack 1. 
+   * the rest replicas can be placed randomly,
+   * @throws Exception
+   */
+  public void testRereplicate2() throws Exception {
+    List<DatanodeDescriptor> choosenNodes = new ArrayList<DatanodeDescriptor>();
+    choosenNodes.add(dataNodes[0]);
+    choosenNodes.add(dataNodes[1]);
+
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(
+        0, dataNodes[0], choosenNodes, null, BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+    
+    targets = replicator.chooseTarget(
+        1, dataNodes[0], choosenNodes, null, BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    
+    targets = replicator.chooseTarget(
+        2, dataNodes[0], choosenNodes, null, BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[1]));
+  }
+
+  /**
+   * This testcase tests re-replication, 
+   * when dataNodes[0] and dataNodes[2] are already choosen.
+   * So the 1st replica should be placed on rack 1. 
+   * the rest replicas can be placed randomly,
+   * @throws Exception
+   */
+  public void testRereplicate3() throws Exception {
+    List<DatanodeDescriptor> choosenNodes = new ArrayList<DatanodeDescriptor>();
+    choosenNodes.add(dataNodes[0]);
+    choosenNodes.add(dataNodes[2]);
+    
+    DatanodeDescriptor[] targets;
+    targets = replicator.chooseTarget(
+        0, dataNodes[0], choosenNodes, null, BLOCK_SIZE);
+    assertEquals(targets.length, 0);
+    
+    targets = replicator.chooseTarget(
+        1, dataNodes[0], choosenNodes, null, BLOCK_SIZE);
+    assertEquals(targets.length, 1);
+    assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    
+    targets = replicator.chooseTarget(
+        2, dataNodes[0], choosenNodes, null, BLOCK_SIZE);
+    assertEquals(targets.length, 2);
+    assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
+    assertFalse(cluster.isOnSameRack(dataNodes[0], targets[1]));
+  }
+}

+ 78 - 0
src/test/org/apache/hadoop/net/TestNetworkTopology.java

@@ -0,0 +1,78 @@
+package org.apache.hadoop.net;
+
+import java.util.HashSet;
+import org.apache.hadoop.dfs.DatanodeDescriptor;
+import org.apache.hadoop.dfs.DatanodeID;
+import junit.framework.TestCase;
+
+public class TestNetworkTopology extends TestCase {
+  private NetworkTopology cluster = new NetworkTopology();
+  private final static DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] {
+      new DatanodeDescriptor(new DatanodeID("h1:5020", "0", -1), "/d1/r1"),
+      new DatanodeDescriptor(new DatanodeID("h2:5020", "0", -1), "/d1/r1"),
+      new DatanodeDescriptor(new DatanodeID("h3:5020", "0", -1), "/d1/r2"),
+      new DatanodeDescriptor(new DatanodeID("h4:5020", "0", -1), "/d1/r2"),
+      new DatanodeDescriptor(new DatanodeID("h5:5020", "0", -1), "/d1/r2"),
+      new DatanodeDescriptor(new DatanodeID("h6:5020", "0", -1), "/d2/r3"),
+      new DatanodeDescriptor(new DatanodeID("h7:5020", "0", -1), "/d2/r3")
+  };
+  private final static DatanodeDescriptor NODE = 
+    new DatanodeDescriptor(new DatanodeID("h8:5020", "0", -1), "/d2/r4");
+  
+  public TestNetworkTopology() {
+    for(int i=0; i<dataNodes.length; i++) {
+      cluster.add( dataNodes[i] );
+    }    
+  }
+  
+  public void testContains() {
+    for(int i=0; i<dataNodes.length; i++) {
+      assertTrue( cluster.contains(dataNodes[i]));
+    }
+    assertFalse( cluster.contains( NODE ));
+  }
+  
+  public void testNumOfChildren() throws Exception {
+    assertEquals(cluster.getNumOfLeaves(), dataNodes.length);
+  }
+
+  public void testNumOfRacks() throws Exception {
+    assertEquals(cluster.getNumOfRacks(), 3);
+  }
+  
+  public void testGetLeaves() throws Exception {
+    DatanodeDescriptor [] leaves = cluster.getLeaves(NodeBase.ROOT);
+    assertEquals(leaves.length, dataNodes.length);
+    HashSet<DatanodeDescriptor> set1 = 
+      new HashSet<DatanodeDescriptor>(leaves.length);
+    HashSet<DatanodeDescriptor> set2 = 
+      new HashSet<DatanodeDescriptor>(dataNodes.length);
+    for(int i=0; i<leaves.length; i++) {
+      set1.add(leaves[i]);
+      set2.add(dataNodes[i]);
+    }
+    assertTrue(set1.equals(set2));
+  }
+  
+  public void testGetDistance() throws Exception {
+    assertEquals(cluster.getDistance(dataNodes[0], dataNodes[0]), 0);
+    assertEquals(cluster.getDistance(dataNodes[0], dataNodes[1]), 2);
+    assertEquals(cluster.getDistance(dataNodes[0], dataNodes[3]), 4);
+    assertEquals(cluster.getDistance(dataNodes[0], dataNodes[6]), 6);
+  }
+
+  public void testRemove() throws Exception {
+    for(int i=0; i<dataNodes.length; i++) {
+      cluster.remove( dataNodes[i] );
+    }
+    for(int i=0; i<dataNodes.length; i++) {
+      assertFalse( cluster.contains( dataNodes[i] ) );
+    }
+    assertEquals(0, cluster.getNumOfLeaves());
+    for(int i=0; i<dataNodes.length; i++) {
+      cluster.add( dataNodes[i] );
+    }
+  }
+
+
+}