소스 검색

HADOOP-1985. This addresses rack-awareness for Map tasks and for HDFS in a uniform way. Contributed by Devaraj Das.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@632035 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 년 전
부모
커밋
95c09bab6d
34개의 변경된 파일1471개의 추가작업 그리고 231개의 파일을 삭제
  1. 3 0
      CHANGES.txt
  2. 39 9
      conf/hadoop-default.xml
  3. 28 2
      docs/cluster_setup.html
  4. 15 4
      docs/cluster_setup.pdf
  5. 28 9
      docs/hadoop-default.html
  6. 1 1
      docs/hdfs_design.html
  7. 20 3
      docs/hdfs_design.pdf
  8. 22 0
      src/docs/src/documentation/content/xdocs/cluster_setup.xml
  9. 1 1
      src/docs/src/documentation/content/xdocs/hdfs_design.xml
  10. 5 0
      src/docs/src/documentation/content/xdocs/site.xml
  11. 62 103
      src/java/org/apache/hadoop/dfs/DataNode.java
  12. 16 0
      src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java
  13. 7 5
      src/java/org/apache/hadoop/dfs/DatanodeInfo.java
  14. 6 4
      src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
  15. 92 6
      src/java/org/apache/hadoop/dfs/FSNamesystem.java
  16. 14 3
      src/java/org/apache/hadoop/dfs/NameNode.java
  17. 157 36
      src/java/org/apache/hadoop/mapred/JobInProgress.java
  18. 1 0
      src/java/org/apache/hadoop/mapred/JobInProgress_Counter.properties
  19. 122 2
      src/java/org/apache/hadoop/mapred/JobTracker.java
  20. 12 0
      src/java/org/apache/hadoop/mapred/Task.java
  21. 14 7
      src/java/org/apache/hadoop/mapred/TaskInProgress.java
  22. 19 1
      src/java/org/apache/hadoop/mapred/TaskTracker.java
  23. 42 0
      src/java/org/apache/hadoop/net/DNSToSwitchMapping.java
  24. 60 0
      src/java/org/apache/hadoop/net/NetUtils.java
  25. 12 5
      src/java/org/apache/hadoop/net/NetworkTopology.java
  26. 2 0
      src/java/org/apache/hadoop/net/Node.java
  27. 3 0
      src/java/org/apache/hadoop/net/NodeBase.java
  28. 139 0
      src/java/org/apache/hadoop/net/ScriptBasedMapping.java
  29. 160 17
      src/test/org/apache/hadoop/dfs/MiniDFSCluster.java
  30. 20 4
      src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java
  31. 1 1
      src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java
  32. 95 8
      src/test/org/apache/hadoop/mapred/MiniMRCluster.java
  33. 191 0
      src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
  34. 62 0
      src/test/org/apache/hadoop/net/StaticMapping.java

+ 3 - 0
CHANGES.txt

@@ -20,6 +20,9 @@ Trunk (unreleased changes)
     Disk layout version changed from -12 to -13. See changelist 630992
     (dhruba)
 
+    HADOOP-1985.  This addresses rack-awareness for Map tasks and for 
+    HDFS in a uniform way. (ddas)
+
   NEW FEATURES
 
     HADOOP-1398.  Add HBase in-memory block cache.  (tomwhite)

+ 39 - 9
conf/hadoop-default.xml

@@ -417,15 +417,6 @@ creations/deletions), or "all".</description>
  	</description>
 </property>
 
-<property>
-  <name>dfs.network.script</name>
-  <value></value>
-  <description>
-        Specifies a script name that print the network location path
-        of the current machine.
-  </description>
-</property>
-
 <property>
   <name>dfs.balance.bandwidthPerSec</name>
   <value>1048576</value>
@@ -1149,4 +1140,43 @@ creations/deletions), or "all".</description>
   </description>
 </property>
 
+<!-- Rack Configuration -->
+
+<property>
+  <name>topology.node.switch.mapping.impl</name>
+  <value>org.apache.hadoop.net.ScriptBasedMapping</value>
+  <description> The default implementation of the DNSToSwitchMapping. It
+    invokes a script specified in topology.script.file.name to resolve
+    node names. If the value for topology.script.file.name is not set, the
+    default value of DEFAULT_RACK is returned for all node names.
+  </description>
+</property>
+
+<property>
+  <name>topology.script.file.name</name>
+  <value></value>
+  <description> The script name that should be invoked to resolve DNS names to
+    NetworkTopology names. Example: the script would take host.foo.bar as an
+    argument, and return /rack1 as the output.
+  </description>
+</property>
+
+<property>
+  <name>topology.script.number.args</name>
+  <value>20</value>
+  <description> The max number of args that the script configured with 
+    topology.script.file.name should be run with. Each arg is an
+    IP address.
+  </description>
+</property>
+
+<property>
+  <name>mapred.task.cache.levels</name>
+  <value>2</value>
+  <description> This is the max level of the task cache. For example, if
+    the level is 2, the tasks cached are at the host level and at the rack
+    level.
+  </description>
+</property>
+
 </configuration>

+ 28 - 2
docs/cluster_setup.html

@@ -204,6 +204,9 @@ document.write("Last Published: " + document.lastModified);
 </ul>
 </li>
 <li>
+<a href="#Hadoop+Rack+Awareness">Hadoop Rack Awareness</a>
+</li>
+<li>
 <a href="#Hadoop+Startup">Hadoop Startup</a>
 </li>
 <li>
@@ -656,7 +659,30 @@ document.write("Last Published: " + document.lastModified);
 </div>
     
     
-<a name="N10343"></a><a name="Hadoop+Startup"></a>
+<a name="N10343"></a><a name="Hadoop+Rack+Awareness"></a>
+<h2 class="h3">Hadoop Rack Awareness</h2>
+<div class="section">
+<p>The HDFS and the Map-Reduce components are rack-aware.</p>
+<p>The <span class="codefrag">NameNode</span> and the <span class="codefrag">JobTracker</span> obtains the
+      <span class="codefrag">rack id</span> of the slaves in the cluster by invoking an API 
+      <a href="api/org/apache/hadoop/net/DNSToSwitchMapping.html#resolve(java.util.List)">resolve</a> in an administrator configured
+      module. The API resolves the slave's DNS name (also IP address) to a 
+      rack id. What module to use can be configured using the configuration
+      item <span class="codefrag">topology.node.switch.mapping.impl</span>. The default 
+      implementation of the same runs a script/command configured using 
+      <span class="codefrag">topology.script.file.name</span>. If topology.script.file.name is
+      not set, the rack id <span class="codefrag">/default-rack</span> is returned for any 
+      passed IP address. The additional configuration in the Map-Reduce
+      part is <span class="codefrag">mapred.cache.task.levels</span> which determines the number
+      of levels (in the network topology) of caches. So, for example, if it is
+      the default value of 2, two levels of caches will be constructed - 
+      one for hosts (host -&gt; task mapping) and another for racks 
+      (rack -&gt; task mapping).
+      </p>
+</div>
+    
+    
+<a name="N10369"></a><a name="Hadoop+Startup"></a>
 <h2 class="h3">Hadoop Startup</h2>
 <div class="section">
 <p>To start a Hadoop cluster you will need to start both the HDFS and 
@@ -691,7 +717,7 @@ document.write("Last Published: " + document.lastModified);
 </div>
     
     
-<a name="N10389"></a><a name="Hadoop+Shutdown"></a>
+<a name="N103AF"></a><a name="Hadoop+Shutdown"></a>
 <h2 class="h3">Hadoop Shutdown</h2>
 <div class="section">
 <p>

파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
+ 15 - 4
docs/cluster_setup.pdf


+ 28 - 9
docs/hadoop-default.html

@@ -17,9 +17,9 @@
 <td><a name="hadoop.logfile.count">hadoop.logfile.count</a></td><td>10</td><td>The max number of log files</td>
 </tr>
 <tr>
-<td><a name="hadoop.job.history.location">hadoop.job.history.location</a></td><td>file://${hadoop.log.dir}/history</td><td> If job tracker is static the history files are stored 
-  in this single well known place. By default, it is in the local 
-  file system at ${hadoop.log.dir}/history.
+<td><a name="hadoop.job.history.location">hadoop.job.history.location</a></td><td></td><td> If job tracker is static the history files are stored 
+  in this single well known place. If No value is set here, by default,
+  it is in the local file system at ${hadoop.log.dir}/history.
   </td>
 </tr>
 <tr>
@@ -250,12 +250,6 @@ creations/deletions), or "all".</td>
  	</td>
 </tr>
 <tr>
-<td><a name="dfs.network.script">dfs.network.script</a></td><td></td><td>
-        Specifies a script name that print the network location path
-        of the current machine.
-  </td>
-</tr>
-<tr>
 <td><a name="dfs.balance.bandwidthPerSec">dfs.balance.bandwidthPerSec</a></td><td>1048576</td><td>
         Specifies the maximum amount of bandwidth that each datanode
         can utilize for the balancing purpose in term of
@@ -696,6 +690,31 @@ creations/deletions), or "all".</td>
     SocksSocketFactory.
   </td>
 </tr>
+<tr>
+<td><a name="topology.node.switch.mapping.impl">topology.node.switch.mapping.impl</a></td><td>org.apache.hadoop.net.ScriptBasedMapping</td><td> The default implementation of the DNSToSwitchMapping. It
+    invokes a script specified in topology.script.file.name to resolve
+    node names. If the value for topology.script.file.name is not set, the
+    default value of DEFAULT_RACK is returned for all node names.
+  </td>
+</tr>
+<tr>
+<td><a name="topology.script.file.name">topology.script.file.name</a></td><td></td><td> The script name that should be invoked to resolve DNS names to
+    NetworkTopology names. Example: the script would take host.foo.bar as an
+    argument, and return /rack1 as the output.
+  </td>
+</tr>
+<tr>
+<td><a name="topology.script.number.args">topology.script.number.args</a></td><td>20</td><td> The max number of args that the script configured with 
+    topology.script.file.name should be run with. Each arg is an
+    IP address.
+  </td>
+</tr>
+<tr>
+<td><a name="mapred.task.cache.levels">mapred.task.cache.levels</a></td><td>2</td><td> This is the max level of the task cache. For example, if
+    the level is 2, the tasks cached are at the host level and at the rack
+    level.
+  </td>
+</tr>
 </table>
 </body>
 </html>

+ 1 - 1
docs/hdfs_design.html

@@ -389,7 +389,7 @@ document.write("Last Published: " + document.lastModified);
         Large HDFS instances run on a cluster of computers that commonly spread across many racks. Communication between two nodes in different racks has to go through switches. In most cases, network bandwidth between machines in the same rack is greater than network bandwidth between machines in different racks.  
         </p>
 <p>
-        At startup time, each Datanode determines the rack it belongs to and notifies the Namenode of its rack id upon registration. HDFS provides <acronym title="Application Programming Interface">API</acronym>s to facilitate pluggable modules that can be used to determine the rack id of a machine. A simple but non-optimal policy is to place replicas on unique racks. This prevents losing data when an entire rack fails and allows use of bandwidth from multiple racks when reading data. This policy evenly distributes replicas in the cluster which makes it easy to balance load on component failure. However, this policy increases the cost of writes because a write needs to transfer blocks to multiple racks. 
+        The NameNode determines the rack id each DataNode belongs to via the process outlined in <a href="cluster_setup.html#Hadoop+Rack+Awareness">Rack Awareness</a>. A simple but non-optimal policy is to place replicas on unique racks. This prevents losing data when an entire rack fails and allows use of bandwidth from multiple racks when reading data. This policy evenly distributes replicas in the cluster which makes it easy to balance load on component failure. However, this policy increases the cost of writes because a write needs to transfer blocks to multiple racks. 
         </p>
 <p>
         For the common case, when the replication factor is three, HDFS&rsquo;s placement policy is to put one replica on one node in the local rack, another on a different node in the local rack, and the last on a different node in a different rack. This policy cuts the inter-rack write traffic which generally improves write performance. The chance of rack failure is far less than that of node failure; this policy does not impact data reliability and availability guarantees. However, it does reduce the aggregate network bandwidth used when reading data since a block is placed in only two unique racks rather than three. With this policy, the replicas of a file do not evenly distribute across the racks. One third of replicas are on one node, two thirds of replicas are on one rack, and the other third are evenly distributed across the remaining racks. This policy improves write performance without compromising data reliability or read performance.

파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
+ 20 - 3
docs/hdfs_design.pdf


+ 22 - 0
src/docs/src/documentation/content/xdocs/cluster_setup.xml

@@ -402,6 +402,28 @@
       typically <code>${HADOOP_HOME}/conf</code>.</p>
     </section>
     
+    <section>
+      <title>Hadoop Rack Awareness</title>
+      <p>The HDFS and the Map-Reduce components are rack-aware.</p>
+      <p>The <code>NameNode</code> and the <code>JobTracker</code> obtains the
+      <code>rack id</code> of the slaves in the cluster by invoking an API 
+      <a href="ext:api/org/apache/hadoop/net/dnstoswitchmapping/resolve
+      ">resolve</a> in an administrator configured
+      module. The API resolves the slave's DNS name (also IP address) to a 
+      rack id. What module to use can be configured using the configuration
+      item <code>topology.node.switch.mapping.impl</code>. The default 
+      implementation of the same runs a script/command configured using 
+      <code>topology.script.file.name</code>. If topology.script.file.name is
+      not set, the rack id <code>/default-rack</code> is returned for any 
+      passed IP address. The additional configuration in the Map-Reduce
+      part is <code>mapred.cache.task.levels</code> which determines the number
+      of levels (in the network topology) of caches. So, for example, if it is
+      the default value of 2, two levels of caches will be constructed - 
+      one for hosts (host -> task mapping) and another for racks 
+      (rack -> task mapping).
+      </p>
+    </section>
+    
     <section>
       <title>Hadoop Startup</title>
       

+ 1 - 1
src/docs/src/documentation/content/xdocs/hdfs_design.xml

@@ -136,7 +136,7 @@
         Large HDFS instances run on a cluster of computers that commonly spread across many racks. Communication between two nodes in different racks has to go through switches. In most cases, network bandwidth between machines in the same rack is greater than network bandwidth between machines in different racks.  
         </p>
         <p>
-        At startup time, each Datanode determines the rack it belongs to and notifies the Namenode of its rack id upon registration. HDFS provides <acronym title="Application Programming Interface">API</acronym>s to facilitate pluggable modules that can be used to determine the rack id of a machine. A simple but non-optimal policy is to place replicas on unique racks. This prevents losing data when an entire rack fails and allows use of bandwidth from multiple racks when reading data. This policy evenly distributes replicas in the cluster which makes it easy to balance load on component failure. However, this policy increases the cost of writes because a write needs to transfer blocks to multiple racks. 
+        The NameNode determines the rack id each DataNode belongs to via the process outlined in <a href="cluster_setup.html#Hadoop+Rack+Awareness">Rack Awareness</a>. A simple but non-optimal policy is to place replicas on unique racks. This prevents losing data when an entire rack fails and allows use of bandwidth from multiple racks when reading data. This policy evenly distributes replicas in the cluster which makes it easy to balance load on component failure. However, this policy increases the cost of writes because a write needs to transfer blocks to multiple racks. 
         </p>
         <p>
         For the common case, when the replication factor is three, HDFS&#x2019;s placement policy is to put one replica on one node in the local rack, another on a different node in the local rack, and the last on a different node in a different rack. This policy cuts the inter-rack write traffic which generally improves write performance. The chance of rack failure is far less than that of node failure; this policy does not impact data reliability and availability guarantees. However, it does reduce the aggregate network bandwidth used when reading data since a block is placed in only two unique racks rather than three. With this policy, the replicas of a file do not evenly distribute across the racks. One third of replicas are on one node, two thirds of replicas are on one rack, and the other third are evenly distributed across the remaining racks. This policy improves write performance without compromising data reliability or read performance.

+ 5 - 0
src/docs/src/documentation/content/xdocs/site.xml

@@ -193,6 +193,11 @@ See http://forrest.apache.org/docs/linking.html for more info.
                 <package-summary href="package-summary.html" />
               </pipes>
             </mapred>
+            <net href="net/">
+              <dnstoswitchmapping href="DNSToSwitchMapping.html">
+              <resolve href="#resolve(java.util.List)" />
+              </dnstoswitchmapping>
+            </net>
             <streaming href="streaming/">
               <package-summary href="package-summary.html" />
             </streaming>

+ 62 - 103
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -26,12 +26,10 @@ import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.mapred.StatusHttpServer;
-import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.dfs.BlockCommand;
 import org.apache.hadoop.dfs.DatanodeProtocol;
 import org.apache.hadoop.dfs.FSDatasetInterface.MetaDataInputStream;
@@ -90,7 +88,7 @@ public class DataNode implements FSConstants, Runnable {
   DatanodeProtocol namenode = null;
   FSDatasetInterface data = null;
   DatanodeRegistration dnRegistration = null;
-  private String networkLoc;
+
   volatile boolean shouldRun = true;
   private LinkedList<Block> receivedBlockList = new LinkedList<Block>();
   private LinkedList<String> delHints = new LinkedList<String>();
@@ -102,15 +100,18 @@ public class DataNode implements FSConstants, Runnable {
   long lastBlockReport = 0;
   boolean resetBlockReportTime = true;
   long initialBlockReportDelay = BLOCKREPORT_INITIAL_DELAY * 1000L;
+  boolean mustReportBlocks = false;
   long lastHeartbeat = 0;
   long heartBeatInterval;
   private DataStorage storage = null;
   private StatusHttpServer infoServer = null;
   private DataNodeMetrics myMetrics;
   private static InetSocketAddress nameNodeAddr;
+  private InetSocketAddress selfAddr;
   private static DataNode datanodeObject = null;
   private Thread dataNodeThread = null;
   String machineName;
+  private static String dnThreadName;
   int defaultBytesPerChecksum = 512;
   private int socketTimeout;
   
@@ -181,9 +182,14 @@ public class DataNode implements FSConstants, Runnable {
                      AbstractList<File> dataDirs
                      ) throws IOException {
     // use configured nameserver & interface to get local hostname
-    machineName = DNS.getDefaultHost(
+    if (conf.get("slave.host.name") != null) {
+      machineName = conf.get("slave.host.name");   
+    }
+    if (machineName == null) {
+      machineName = DNS.getDefaultHost(
                                      conf.get("dfs.datanode.dns.interface","default"),
                                      conf.get("dfs.datanode.dns.nameserver","default"));
+    }
     InetSocketAddress nameNodeAddr = NetUtils.createSocketAddr(
                                      conf.get("fs.default.name", "local"));
     
@@ -246,6 +252,8 @@ public class DataNode implements FSConstants, Runnable {
     ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE); 
     // adjust machine name with the actual port
     tmpPort = ss.getLocalPort();
+    selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
+                                     tmpPort);
     this.dnRegistration.setName(machineName + ":" + tmpPort);
     LOG.info("Opened server at " + tmpPort);
       
@@ -304,12 +312,6 @@ public class DataNode implements FSConstants, Runnable {
     this.infoServer.start();
     // adjust info port
     this.dnRegistration.setInfoPort(this.infoServer.getPort());
-    // get network location
-    this.networkLoc = conf.get("dfs.datanode.rack");
-    if (networkLoc == null)  // exec network script or set the default rack
-      networkLoc = getNetworkLoc(conf);
-    // register datanode
-    register();
     myMetrics = new DataNodeMetrics(conf, dnRegistration.getStorageID());
   }
 
@@ -356,6 +358,10 @@ public class DataNode implements FSConstants, Runnable {
   public InetSocketAddress getNameNodeAddr() {
     return nameNodeAddr;
   }
+  
+  public InetSocketAddress getSelfAddr() {
+    return selfAddr;
+  }
     
   DataNodeMetrics getMetrics() {
     return myMetrics;
@@ -418,7 +424,7 @@ public class DataNode implements FSConstants, Runnable {
       try {
         // reset name to machineName. Mainly for web interface.
         dnRegistration.name = machineName + ":" + dnRegistration.getPort();
-        dnRegistration = namenode.register(dnRegistration, networkLoc);
+        dnRegistration = namenode.register(dnRegistration);
         break;
       } catch(SocketTimeoutException e) {  // namenode is busy
         LOG.info("Problem connecting to server: " + getNameNodeAddr());
@@ -632,12 +638,14 @@ public class DataNode implements FSConstants, Runnable {
         }
 
         // send block report
-        if (startTime - lastBlockReport > blockReportInterval) {
+        if (mustReportBlocks || 
+            startTime - lastBlockReport > blockReportInterval) {
           //
           // Send latest blockinfo report if timer has expired.
           // Get back a list of local block(s) that are obsolete
           // and can be safely GC'ed.
           //
+          mustReportBlocks = false;
           long brStartTime = now();
           Block[] bReport = data.getBlockReport();
           DatanodeCommand cmd = namenode.blockReport(dnRegistration,
@@ -739,6 +747,16 @@ public class DataNode implements FSConstants, Runnable {
       // start distributed upgrade here
       processDistributedUpgradeCommand((UpgradeCommand)cmd);
       break;
+    case DatanodeProtocol.DNA_BLOCKREPORT:
+      try {
+        if (initialBlockReportDelay != 0) {
+          //sleep for a random time upto the heartbeat interval 
+          //before sending the block report
+          Thread.sleep((long)(new Random().nextDouble() * heartBeatInterval));
+        }
+        mustReportBlocks = true;
+      } catch (InterruptedException ie) {}
+      return false;
     default:
       LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
     }
@@ -2447,32 +2465,49 @@ public class DataNode implements FSConstants, Runnable {
     LOG.info(dnRegistration + ":Finishing DataNode in: "+data);
   }
     
-  /** Start datanode daemon.
+  /** Start a single datanode daemon and wait for it to finish.
+   *  If this thread is specifically interrupted, it will stop waiting.
    */
-  public static DataNode run(Configuration conf) throws IOException {
-    String[] dataDirs = conf.getStrings("dfs.data.dir");
-    DataNode dn = makeInstance(dataDirs, conf);
+  static void runDatanodeDaemon(DataNode dn) throws IOException {
     if (dn != null) {
-      dn.dataNodeThread = new Thread(dn, "DataNode: [" +
-                                  StringUtils.arrayToString(dataDirs) + "]");
+      //register datanode
+      dn.register();
+      dn.dataNodeThread = new Thread(dn, dnThreadName);
       dn.dataNodeThread.setDaemon(true); // needed for JUnit testing
       dn.dataNodeThread.start();
     }
-    return dn;
   }
 
-  /** Start a single datanode daemon and wait for it to finish.
-   *  If this thread is specifically interrupted, it will stop waiting.
+  /** Instantiate a single datanode object. This must be run by invoking
+   *  {@link DataNode#runDatanodeDaemon(DataNode)} subsequently. 
    */
-  static DataNode createDataNode(String args[],
-                                 Configuration conf) throws IOException {
+  static DataNode instantiateDataNode(String args[],
+                                      Configuration conf) throws IOException {
     if (conf == null)
       conf = new Configuration();
     if (!parseArguments(args, conf)) {
       printUsage();
       return null;
     }
-    return run(conf);
+    if (conf.get("dfs.network.script") != null) {
+      LOG.error("This configuration for rack identification is not supported" +
+          " anymore. RackID resolution is handled by the NameNode.");
+      System.exit(-1);
+    }
+    String[] dataDirs = conf.getStrings("dfs.data.dir");
+    dnThreadName = "DataNode: [" +
+                        StringUtils.arrayToString(dataDirs) + "]";
+    return makeInstance(dataDirs, conf);
+  }
+
+  /** Instantiate & Start a single datanode daemon and wait for it to finish.
+   *  If this thread is specifically interrupted, it will stop waiting.
+   */
+  static DataNode createDataNode(String args[],
+                                 Configuration conf) throws IOException {
+    DataNode dn = instantiateDataNode(args, conf);
+    runDatanodeDaemon(dn);
+    return dn;
   }
 
   void join() {
@@ -2524,7 +2559,6 @@ public class DataNode implements FSConstants, Runnable {
   
   private static void printUsage() {
     System.err.println("Usage: java DataNode");
-    System.err.println("           [-r, --rack <network location>] |");
     System.err.println("           [-rollback]");
   }
 
@@ -2537,15 +2571,12 @@ public class DataNode implements FSConstants, Runnable {
                                         Configuration conf) {
     int argsLen = (args == null) ? 0 : args.length;
     StartupOption startOpt = StartupOption.REGULAR;
-    String networkLoc = null;
     for(int i=0; i < argsLen; i++) {
       String cmd = args[i];
       if ("-r".equalsIgnoreCase(cmd) || "--rack".equalsIgnoreCase(cmd)) {
-        if (i==args.length-1)
-          return false;
-        networkLoc = args[++i];
-        if (networkLoc.startsWith("-"))
-          return false;
+        LOG.error("-r, --rack arguments are not supported anymore. RackID " +
+            "resolution is handled by the NameNode.");
+        System.exit(-1);
       } else if ("-rollback".equalsIgnoreCase(cmd)) {
         startOpt = StartupOption.ROLLBACK;
       } else if ("-regular".equalsIgnoreCase(cmd)) {
@@ -2553,8 +2584,6 @@ public class DataNode implements FSConstants, Runnable {
       } else
         return false;
     }
-    if (networkLoc != null)
-      conf.set("dfs.datanode.rack", NodeBase.normalize(networkLoc));
     setStartupOption(conf, startOpt);
     return true;
   }
@@ -2568,76 +2597,6 @@ public class DataNode implements FSConstants, Runnable {
                                           StartupOption.REGULAR.toString()));
   }
 
-  /* Get the network location by running a script configured in conf */
-  private static String getNetworkLoc(Configuration conf) 
-    throws IOException {
-    String locScript = conf.get("dfs.network.script");
-    if (locScript == null) 
-      return NetworkTopology.DEFAULT_RACK;
-
-    LOG.info("Starting to run script to get datanode network location");
-    Process p = Runtime.getRuntime().exec(locScript);
-    StringBuffer networkLoc = new StringBuffer();
-    final BufferedReader inR = new BufferedReader(
-                                                  new InputStreamReader(p.getInputStream()));
-    final BufferedReader errR = new BufferedReader(
-                                                   new InputStreamReader(p.getErrorStream()));
-
-    // read & log any error messages from the running script
-    Thread errThread = new Thread() {
-        @Override
-        public void start() {
-          try {
-            String errLine = errR.readLine();
-            while(errLine != null) {
-              LOG.warn("Network script error: "+errLine);
-              errLine = errR.readLine();
-            }
-          } catch(IOException e) {
-                    
-          }
-        }
-      };
-    try {
-      errThread.start();
-            
-      // fetch output from the process
-      String line = inR.readLine();
-      while(line != null) {
-        networkLoc.append(line);
-        line = inR.readLine();
-      }
-      try {
-        // wait for the process to finish
-        int returnVal = p.waitFor();
-        // check the exit code
-        if (returnVal != 0) {
-          throw new IOException("Process exits with nonzero status: "+locScript);
-        }
-      } catch (InterruptedException e) {
-        throw new IOException(e.getMessage());
-      } finally {
-        try {
-          // make sure that the error thread exits
-          errThread.join();
-        } catch (InterruptedException je) {
-          LOG.warn(StringUtils.stringifyException(je));
-        }
-      }
-    } finally {
-      // close in & error streams
-      try {
-        inR.close();
-      } catch (IOException ine) {
-        throw ine;
-      } finally {
-        errR.close();
-      }
-    }
-
-    return networkLoc.toString();
-  }
-  
   /**
    * This methods  arranges for the data node to send the block report at the next heartbeat.
    */

+ 16 - 0
src/java/org/apache/hadoop/dfs/DatanodeDescriptor.java

@@ -47,6 +47,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
   List<Block> replicateBlocks;
   List<DatanodeDescriptor[]> replicateTargetSets;
   Set<Block> invalidateBlocks;
+  boolean processedBlockReport = false;
   
   /** Default constructor */
   public DatanodeDescriptor() {
@@ -259,6 +260,21 @@ public class DatanodeDescriptor extends DatanodeInfo {
       return invalidateBlocks.size();
     }
   }
+  
+  /**
+   * Set the bit signifying that the first block report from this datanode has been 
+   * processed
+   */
+  void setBlockReportProcessed(boolean val) {
+    processedBlockReport = val;
+  }
+  
+  /**
+   * Have we processed any block report from this datanode yet
+   */
+  boolean getBlockReportProcessed() {
+    return processedBlockReport;
+  }
 
   /**
    * Remove the specified number of target sets

+ 7 - 5
src/java/org/apache/hadoop/dfs/DatanodeInfo.java

@@ -44,7 +44,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
   protected long remaining;
   protected long lastUpdate;
   protected int xceiverCount;
-  private String location = NetworkTopology.DEFAULT_RACK;
+  private String location = NetworkTopology.UNRESOLVED;
 
   /** HostName as suplied by the datanode during registration as its 
    * name. Namenode uses datanode IP address as the name.
@@ -125,10 +125,10 @@ public class DatanodeInfo extends DatanodeID implements Node {
   }
 
   /** rack name **/
-  public String getNetworkLocation() {return location;}
+  public synchronized String getNetworkLocation() {return location;}
     
   /** Sets the rack name */
-  void setNetworkLocation(String location) {
+  public synchronized void setNetworkLocation(String location) {
     this.location = NodeBase.normalize(location);
   }
   
@@ -147,7 +147,8 @@ public class DatanodeInfo extends DatanodeID implements Node {
     long r = getRemaining();
     long u = getDfsUsed();
     buffer.append("Name: "+name+"\n");
-    if (!NetworkTopology.DEFAULT_RACK.equals(location)) {
+    if (!NetworkTopology.UNRESOLVED.equals(location) && 
+        !NetworkTopology.DEFAULT_RACK.equals(location)) {
       buffer.append("Rack: "+location+"\n");
     }
     if (isDecommissioned()) {
@@ -172,7 +173,8 @@ public class DatanodeInfo extends DatanodeID implements Node {
     long r = getRemaining();
     long u = getDfsUsed();
     buffer.append(name);
-    if (!NetworkTopology.DEFAULT_RACK.equals(location)) {
+    if (!NetworkTopology.UNRESOLVED.equals(location) &&
+        !NetworkTopology.DEFAULT_RACK.equals(location)) {
       buffer.append(" "+location);
     }
     if (isDecommissioned()) {

+ 6 - 4
src/java/org/apache/hadoop/dfs/DatanodeProtocol.java

@@ -31,10 +31,12 @@ import org.apache.hadoop.ipc.VersionedProtocol;
  **********************************************************************/
 interface DatanodeProtocol extends VersionedProtocol {
   /*
+   * 12: removed the rack ID from registration;
+   * 12: added DNA_BLOCKREPORT
    * 11 : reportBadBlocks() is added.
    * 11 Block reports as long[]
    */
-  public static final long versionID = 11L;
+  public static final long versionID = 12L;
   
   // error code
   final static int NOTIFY = 0;
@@ -51,19 +53,19 @@ interface DatanodeProtocol extends VersionedProtocol {
   final static int DNA_SHUTDOWN = 3;   // shutdown node
   final static int DNA_REGISTER = 4;   // re-register
   final static int DNA_FINALIZE = 5;   // finalize previous upgrade
+  final static int DNA_BLOCKREPORT = 6;   // request a block report
 
   /** 
    * Register Datanode.
    *
    * @see org.apache.hadoop.dfs.DataNode#register()
-   * @see org.apache.hadoop.dfs.FSNamesystem#registerDatanode(DatanodeRegistration, String)
+   * @see org.apache.hadoop.dfs.FSNamesystem#registerDatanode(DatanodeRegistration)
    * 
    * @return updated {@link org.apache.hadoop.dfs.DatanodeRegistration}, which contains 
    * new storageID if the datanode did not have one and
    * registration ID for further communication.
    */
-  public DatanodeRegistration register(DatanodeRegistration registration,
-                                       String networkLocation
+  public DatanodeRegistration register(DatanodeRegistration registration
                                        ) throws IOException;
   /**
    * sendHeartbeat() tells the NameNode that the DataNode is still

+ 92 - 6
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -27,8 +27,10 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.mapred.StatusHttpServer;
 import org.apache.hadoop.metrics.util.MBeanUtil;
+import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.ipc.Server;
@@ -42,6 +44,7 @@ import java.io.DataOutputStream;
 import java.net.InetSocketAddress;
 import java.util.*;
 import java.util.Map.Entry;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.text.SimpleDateFormat;
 
 import javax.management.NotCompliantMBeanException;
@@ -175,6 +178,8 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
   Daemon lmthread = null;   // LeaseMonitor thread
   Daemon smmthread = null;  // SafeModeMonitor thread
   Daemon replthread = null;  // Replication thread
+  Daemon resthread = null; //ResolutionMonitor thread
+  
   volatile boolean fsRunning = true;
   long systemStart = 0;
 
@@ -208,6 +213,10 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
     
   // datanode networktoplogy
   NetworkTopology clusterMap = new NetworkTopology();
+  private DNSToSwitchMapping dnsToSwitchMapping;
+  private LinkedBlockingQueue<DatanodeDescriptor> resolutionQueue = 
+    new LinkedBlockingQueue <DatanodeDescriptor>();
+  
   // for block replicas placement
   ReplicationTargetChooser replicator;
 
@@ -271,9 +280,11 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
     this.hbthread = new Daemon(new HeartbeatMonitor());
     this.lmthread = new Daemon(new LeaseMonitor());
     this.replthread = new Daemon(new ReplicationMonitor());
+    this.resthread = new Daemon(new ResolutionMonitor());
     hbthread.start();
     lmthread.start();
     replthread.start();
+    resthread.start();
     
     this.registerMBean(); // register the MBean for the FSNamesystemStutus
 
@@ -283,6 +294,10 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
     this.dnthread = new Daemon(new DecommissionedMonitor());
     dnthread.start();
 
+    this.dnsToSwitchMapping = (DNSToSwitchMapping)ReflectionUtils.newInstance(
+        conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
+            DNSToSwitchMapping.class), conf);
+
     String infoAddr = 
       NetUtils.getServerAddress(conf, "dfs.info.bindAddress", 
                                 "dfs.info.port", "dfs.http.address");
@@ -419,6 +434,7 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
       if (infoServer != null) infoServer.stop();
       if (hbthread != null) hbthread.interrupt();
       if (replthread != null) replthread.interrupt();
+      if (resthread != null) resthread.interrupt();
       if (dnthread != null) dnthread.interrupt();
       if (smmthread != null) smmthread.interrupt();
     } catch (InterruptedException ie) {
@@ -1835,6 +1851,72 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
     return dir.getListing(src);
   }
 
+  public void addToResolutionQueue(DatanodeDescriptor d) {
+    while (!resolutionQueue.add(d)) {
+      LOG.warn("Couldn't add to the Resolution queue now. Will " +
+               "try again");
+      try {
+        Thread.sleep(2000);
+      } catch (InterruptedException ie) {}
+    }
+  }
+  
+  private class ResolutionMonitor implements Runnable {
+    public void run() {
+      try {
+        while (fsRunning) {
+          int size;
+          if((size = resolutionQueue.size()) == 0) {
+            Thread.sleep(2000);
+            continue;
+          }
+          List <DatanodeDescriptor> datanodes = 
+            new ArrayList<DatanodeDescriptor>(size);
+          resolutionQueue.drainTo(datanodes);
+          List<String> dnHosts = new ArrayList<String>(size);
+          for (DatanodeDescriptor d : datanodes) {
+            dnHosts.add(d.getName());
+          }
+          List<String> rName = dnsToSwitchMapping.resolve(dnHosts);
+          if (rName == null) {
+            continue;
+          }
+          int i = 0;
+          for (String m : rName) {
+            DatanodeDescriptor d = datanodes.get(i++); 
+            d.setNetworkLocation(m);
+            clusterMap.add(d);
+          }
+        }
+      } catch (Exception e) {
+        FSNamesystem.LOG.error(StringUtils.stringifyException(e));
+      }
+    }
+  }
+  
+  /**
+   * Has the block report of the datanode represented by nodeReg been processed
+   * yet.
+   * @param nodeReg
+   * @return true or false
+   */
+  public synchronized boolean blockReportProcessed(DatanodeRegistration nodeReg)
+  throws IOException {
+    return getDatanode(nodeReg).getBlockReportProcessed();
+  }
+  
+  /**
+   * Has the datanode been resolved to a switch/rack
+   */
+  public synchronized boolean isResolved(DatanodeRegistration dnReg) {
+    try {
+      return !getDatanode(dnReg).getNetworkLocation()
+            .equals(NetworkTopology.UNRESOLVED);
+    } catch (IOException ie) {
+      return false;
+    }
+  }
+    
   /////////////////////////////////////////////////////////
   //
   // These methods are called by datanodes
@@ -1863,8 +1945,7 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
    * 
    * @see DataNode#register()
    */
-  public synchronized void registerDatanode(DatanodeRegistration nodeReg,
-                                            String networkLocation
+  public synchronized void registerDatanode(DatanodeRegistration nodeReg
                                             ) throws IOException {
 
     if (!verifyNodeRegistration(nodeReg)) {
@@ -1931,9 +2012,9 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
       // update cluster map
       clusterMap.remove(nodeS);
       nodeS.updateRegInfo(nodeReg);
-      nodeS.setNetworkLocation(networkLocation);
-      clusterMap.add(nodeS);
       nodeS.setHostName(hostName);
+      nodeS.setNetworkLocation(NetworkTopology.UNRESOLVED);
+      addToResolutionQueue(nodeS);
         
       // also treat the registration message as a heartbeat
       synchronized(heartbeats) {
@@ -1958,9 +2039,9 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
     }
     // register new datanode
     DatanodeDescriptor nodeDescr 
-      = new DatanodeDescriptor(nodeReg, networkLocation, hostName);
+      = new DatanodeDescriptor(nodeReg, NetworkTopology.UNRESOLVED, hostName);
     unprotectedAddDatanode(nodeDescr);
-    clusterMap.add(nodeDescr);
+    addToResolutionQueue(nodeDescr);
       
     // also treat the registration message as a heartbeat
     synchronized(heartbeats) {
@@ -2406,6 +2487,10 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
       setDatanodeDead(node);
       throw new DisallowedDatanodeException(node);
     }
+    
+    if (node.getNetworkLocation().equals(NetworkTopology.UNRESOLVED)) {
+      return null; //drop the block report if the dn hasn't been resolved
+    }
 
     //
     // Modify the (block-->datanode) map, according to the difference
@@ -2455,6 +2540,7 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
       }
     }
     NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
+    node.setBlockReportProcessed(true);
     return obsolete.toArray(new Block[obsolete.size()]);
   }
 

+ 14 - 3
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -544,11 +544,10 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
   ////////////////////////////////////////////////////////////////
   /** 
    */
-  public DatanodeRegistration register(DatanodeRegistration nodeReg,
-                                       String networkLocation
+  public DatanodeRegistration register(DatanodeRegistration nodeReg
                                        ) throws IOException {
     verifyVersion(nodeReg.getVersion());
-    namesystem.registerDatanode(nodeReg, networkLocation);
+    namesystem.registerDatanode(nodeReg);
       
     return nodeReg;
   }
@@ -579,6 +578,18 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
       assert(xferResults[0] == null && deleteList[0] == null);
       return new DatanodeCommand(DatanodeProtocol.DNA_REGISTER);
     }
+    //
+    // If the datanode has (just) been resolved and we haven't ever processed 
+    // a block report from it yet, ask for one now.
+    //
+    if (!namesystem.blockReportProcessed(nodeReg)) {
+      // If we never processed a block report from this datanode, we shouldn't
+      // have any work for that as well
+      assert(xferResults[0] == null && deleteList[0] == null);
+      if (namesystem.isResolved(nodeReg)) {
+        return new DatanodeCommand(DatanodeProtocol.DNA_BLOCKREPORT);
+      }
+    }
         
     //
     // Ask to perform pending transfers, if any

+ 157 - 36
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -20,7 +20,7 @@ package org.apache.hadoop.mapred;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapred.JobTracker.JobTrackerMetrics;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.net.*;
 
 /*************************************************************
  * JobInProgress maintains all the info for keeping
@@ -74,8 +75,12 @@ class JobInProgress {
 
   JobPriority priority = JobPriority.NORMAL;
   JobTracker jobtracker = null;
-  Map<String,List<TaskInProgress>> hostToMaps =
-    new HashMap<String,List<TaskInProgress>>();
+
+  // NetworkTopology Node to the set of TIPs
+  Map<Node, List<TaskInProgress>> nodesToMaps;
+  
+  private int maxLevel;
+  
   private int taskCompletionEventTracker = 0; 
   List<TaskCompletionEvent> taskCompletionEvents;
     
@@ -110,7 +115,8 @@ class JobInProgress {
     NUM_FAILED_REDUCES,
     TOTAL_LAUNCHED_MAPS,
     TOTAL_LAUNCHED_REDUCES,
-    DATA_LOCAL_MAPS
+    DATA_LOCAL_MAPS,
+    RACK_LOCAL_MAPS
   }
   private Counters jobCounters = new Counters();
   
@@ -204,8 +210,61 @@ class JobInProgress {
     jobMetrics.removeTag("counter");
     jobMetrics.remove();
   }
+    
+  private Node getParentNode(Node node, int level) {
+    for (int i = 0; node != null && i < level; i++) {
+      node = node.getParent();
+    }
+    return node;
+  }
+  private void printCache (Map<Node, List<TaskInProgress>> cache) {
+    LOG.info("The taskcache info:");
+    for (Map.Entry<Node, List<TaskInProgress>> n : cache.entrySet()) {
+      List <TaskInProgress> tips = n.getValue();
+      LOG.info("Cached TIPs on node: " + n.getKey());
+      for (TaskInProgress tip : tips) {
+        LOG.info("tip : " + tip.getTIPId());
+      }
+    }
+  }
   
+  private Map<Node, List<TaskInProgress>> createCache(
+                         JobClient.RawSplit[] splits, int maxLevel) {
+    Map<Node, List<TaskInProgress>> cache = 
+      new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
     
+    for (int i = 0; i < splits.length; i++) {
+      for(String host: splits[i].getLocations()) {
+        Node node = jobtracker.resolveAndAddToTopology(host);
+        if (node == null) {
+          continue;
+        }
+        if (node.getLevel() < maxLevel) {
+          LOG.warn("Got a host whose level is: " + node.getLevel() +
+              ". Should get at least a level of value: " + maxLevel);
+          return null;
+        }
+        for (int j = 0; j < maxLevel; j++) {
+          node = getParentNode(node, j);
+          List<TaskInProgress> hostMaps = cache.get(node);
+          if (hostMaps == null) {
+            hostMaps = new ArrayList<TaskInProgress>();
+            cache.put(node, hostMaps);
+            hostMaps.add(maps[i]);
+          }
+          //check whether the hostMaps already contains an entry for a TIP
+          //This will be true for nodes that are racks and multiple nodes in
+          //the rack contain the input for a tip. Note that if it already
+          //exists in the hostMaps, it must be the last element there since
+          //we process one TIP at a time sequentially in the split-size order
+          if (hostMaps.get(hostMaps.size() - 1) != maps[i]) {
+            hostMaps.add(maps[i]);
+          }
+        }
+      }
+    }
+    return cache;
+  }
   /**
    * Construct the splits, etc.  This is invoked from an async
    * thread so that split-computation doesn't block anyone.
@@ -233,17 +292,11 @@ class JobInProgress {
     maps = new TaskInProgress[numMapTasks];
     for(int i=0; i < numMapTasks; ++i) {
       maps[i] = new TaskInProgress(jobId, jobFile, 
-                                   splits[i].getClassName(),
-                                   splits[i].getBytes(), 
+                                   splits[i], 
                                    jobtracker, conf, this, i);
-      for(String host: splits[i].getLocations()) {
-        List<TaskInProgress> hostMaps = hostToMaps.get(host);
-        if (hostMaps == null) {
-          hostMaps = new ArrayList<TaskInProgress>();
-          hostToMaps.put(host, hostMaps);
-        }
-        hostMaps.add(maps[i]);              
-      }
+    }
+    if (numMapTasks > 0) { 
+      nodesToMaps = createCache(splits, (maxLevel = jobtracker.getNumTaskCacheLevels()));
     }
         
     // if no split is returned, job is considered completed and successful
@@ -410,7 +463,13 @@ class JobInProgress {
       }
 
       if (null != ttStatus){
-        httpTaskLogLocation = "http://" + ttStatus.getHost() + ":" + 
+        String host;
+        if (NetUtils.getStaticResolution(ttStatus.getHost()) != null) {
+          host = NetUtils.getStaticResolution(ttStatus.getHost());
+        } else {
+          host = ttStatus.getHost();
+        }
+        httpTaskLogLocation = "http://" + host + ":" + 
           ttStatus.getHttpPort() + "/tasklog?plaintext=true&taskid=" +
           status.getTaskId();
       }
@@ -571,9 +630,9 @@ class JobInProgress {
       return null;
     }
     
-    ArrayList mapCache = (ArrayList)hostToMaps.get(tts.getHost());
+    
     int target = findNewTask(tts, clusterSize, status.mapProgress(), 
-                             maps, mapCache);
+                             maps, nodesToMaps);
     if (target == -1) {
       return null;
     }
@@ -695,6 +754,13 @@ class JobInProgress {
     return trackerErrors;
   }
     
+  private boolean shouldRunSpeculativeTask(TaskInProgress task, 
+                                          double avgProgress,
+                                          String taskTracker) {
+    return task.hasSpeculativeTask(avgProgress) && 
+           !task.hasRunOnMachine(taskTracker);
+  }
+  
   /**
    * Find a new task to run.
    * @param tts The task tracker that is asking for a task
@@ -709,14 +775,16 @@ class JobInProgress {
                           int clusterSize,
                           double avgProgress,
                           TaskInProgress[] tasks,
-                          List cachedTasks) {
+                          Map<Node,List<TaskInProgress>> cachedTasks) {
     String taskTracker = tts.getTrackerName();
+    int specTarget = -1;
 
     //
     // Update the last-known clusterSize
     //
     this.clusterSize = clusterSize;
 
+    Node node = jobtracker.getNode(tts.getHost());
     //
     // Check if too many tasks of this job have failed on this
     // tasktracker prior to assigning it a new one.
@@ -734,22 +802,56 @@ class JobInProgress {
         
     //
     // See if there is a split over a block that is stored on
-    // the TaskTracker checking in.  That means the block
-    // doesn't have to be transmitted from another node.
+    // the TaskTracker checking in or the rack it belongs to and so on till
+    // maxLevel.  That means the block
+    // doesn't have to be transmitted from another node/rack/and so on.
+    // The way the cache is updated is such that in every lookup, the TIPs
+    // which are complete is removed. Running/Failed TIPs are not removed
+    // since we want to have locality optimizations even for FAILED/SPECULATIVE
+    // tasks.
     //
-    if (cachedTasks != null) {
-      Iterator i = cachedTasks.iterator();
-      while (i.hasNext()) {
-        TaskInProgress tip = (TaskInProgress)i.next();
-        i.remove();
-        if (tip.isRunnable() && 
-            !tip.isRunning() &&
-            !tip.hasFailedOnMachine(taskTracker)) {
-          LOG.info("Choosing cached task " + tip.getTIPId());
-          int cacheTarget = tip.getIdWithinJob();
-          jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1);
-          return cacheTarget;
+    if (cachedTasks != null && node != null) {
+      Node key = node;
+      for (int level = 0; level < maxLevel && key != null; level++) {
+        List <TaskInProgress> cacheForLevel = cachedTasks.get(key);
+        if (cacheForLevel != null) {
+          Iterator<TaskInProgress> i = cacheForLevel.iterator();
+          while (i.hasNext()) {
+            TaskInProgress tip = i.next();
+            // we remove only those TIPs that are data-local (the host having
+            // the data is running the task). We don't remove TIPs that are 
+            // rack-local for example since that would negatively impact
+            // the performance of speculative and failed tasks (imagine a case
+            // where we schedule one TIP rack-local and after sometime another
+            // tasktracker from the same rack is asking for a task, and the TIP
+            // in question has either failed or could be a speculative task
+            // candidate)
+            if (tip.isComplete() || level == 0) {
+              i.remove();
+            }
+            if (tip.isRunnable() && 
+                !tip.isRunning() &&
+                !tip.hasFailedOnMachine(taskTracker)) {
+              int cacheTarget = tip.getIdWithinJob();
+              if (level == 0) {
+                LOG.info("Choosing data-local task " + tip.getTIPId());
+                jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1);
+              } else if (level == 1){
+                LOG.info("Choosing rack-local task " + tip.getTIPId());
+                jobCounters.incrCounter(Counter.RACK_LOCAL_MAPS, 1);
+              } else {
+                LOG.info("Choosing cached task at level " + level + " " + 
+                          tip.getTIPId());
+              }
+              return cacheTarget;
+            }
+            if (specTarget == -1 &&
+                shouldRunSpeculativeTask(tip, avgProgress, taskTracker)) {
+              specTarget = tip.getIdWithinJob();
+            }
+          }
         }
+        key = key.getParent();
       }
     }
 
@@ -759,7 +861,6 @@ class JobInProgress {
     // a std. task to run.
     //
     int failedTarget = -1;
-    int specTarget = -1;
     for (int i = 0; i < tasks.length; i++) {
       TaskInProgress task = tasks[i];
       if (task.isRunnable()) {
@@ -781,8 +882,7 @@ class JobInProgress {
             LOG.info("Choosing normal task " + tasks[i].getTIPId());
             return i;
           } else if (specTarget == -1 &&
-                     task.hasSpeculativeTask(avgProgress) && 
-                     !task.hasRunOnMachine(taskTracker)) {
+                     shouldRunSpeculativeTask(task, avgProgress, taskTracker)) {
             specTarget = i;
           }
         }
@@ -989,7 +1089,28 @@ class JobInProgress {
         
     // the case when the map was complete but the task tracker went down.
     if (wasComplete && !isComplete) {
-      if (tip.isMapTask()){
+      if (tip.isMapTask()) {
+        // Put the task back in the cache. This will help locality for cases
+        // where we have a different TaskTracker from the same rack/switch
+        // asking for a task. Note that we don't add the TIP to the host's cache
+        // again since we don't execute a failed TIP on the same TT again, 
+        // and also we bother about only those TIPs that were successful
+        // earlier (wasComplete and !isComplete) 
+        // (since they might have been removed from the cache of other 
+        // racks/switches, if the input split blocks were present there too)
+        for (String host : tip.getSplitLocations()) {
+          Node node = jobtracker.getNode(host);
+          for (int level = 1; (node != null && level < maxLevel); level++) {
+            node = getParentNode(node, level);
+            if (node == null) {
+              break;
+            }
+            List<TaskInProgress> list = nodesToMaps.get(node);
+            if (list != null) {
+              list.add(tip);
+            }
+          }
+        }
         finishedMapTasks -= 1;
       }
     }

+ 1 - 0
src/java/org/apache/hadoop/mapred/JobInProgress_Counter.properties

@@ -7,4 +7,5 @@ NUM_FAILED_REDUCES.name=       Failed reduce tasks
 TOTAL_LAUNCHED_MAPS.name=      Launched map tasks
 TOTAL_LAUNCHED_REDUCES.name=   Launched reduce tasks
 DATA_LOCAL_MAPS.name=          Data-local map tasks
+RACK_LOCAL_MAPS.name=          Rack-local map tasks
 

+ 122 - 2
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -57,8 +57,14 @@ import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.Updater;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.util.HostsFileReader;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
 /*******************************************************
@@ -77,6 +83,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
   State state = State.INITIALIZING;
   private static final int SYSTEM_DIR_CLEANUP_RETRY_PERIOD = 10000;
 
+  private DNSToSwitchMapping dnsToSwitchMapping;
+  private NetworkTopology clusterMap = new NetworkTopology();
+  private ResolutionThread resThread = new ResolutionThread();
+  private int numTaskCacheLevels; // the max level of a host in the network topology
+  
   /**
    * A client tried to submit a job before the Job Tracker was ready.
    */
@@ -544,6 +555,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
   // (trackerID --> last sent HeartBeatResponse)
   Map<String, HeartbeatResponse> trackerToHeartbeatResponseMap = 
     new TreeMap<String, HeartbeatResponse>();
+
+  // (trackerHostName --> Node (NetworkTopology))
+  Map<String, Node> trackerNameToNodeMap = 
+    Collections.synchronizedMap(new TreeMap<String, Node>());
+  
+  // Number of resolved entries
+  int numResolved;
     
   //
   // Watch and expire TaskTracker objects using these structures.
@@ -724,6 +742,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
 
     // Same with 'localDir' except it's always on the local disk.
     jobConf.deleteLocalFiles(SUBDIR);
+    this.dnsToSwitchMapping = (DNSToSwitchMapping)ReflectionUtils.newInstance(
+        conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
+            DNSToSwitchMapping.class), conf);
+    this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels", 
+        NetworkTopology.DEFAULT_HOST_LEVEL);
     synchronized (this) {
       state = State.RUNNING;
     }
@@ -751,6 +774,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     this.expireTrackersThread = new Thread(this.expireTrackers,
                                           "expireTrackers");
     this.expireTrackersThread.start();
+    this.resThread.start();
     this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
     this.retireJobsThread.start();
     this.initJobsThread = new Thread(this.initJobs, "initJobs");
@@ -835,6 +859,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         ex.printStackTrace();
       }
     }
+    if (this.resThread != null) {
+      LOG.info("Stopping DNSToSwitchMapping Resolution thread");
+      this.resThread.interrupt();
+      try {
+        this.resThread.join();
+      } catch (InterruptedException ex) {
+        ex.printStackTrace();
+      }
+    }
     if (this.completedJobsStoreThread != null &&
         this.completedJobsStoreThread.isAlive()) {
       LOG.info("Stopping completedJobsStore thread");
@@ -1147,6 +1180,31 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     }
   }
 
+  public Node resolveAndAddToTopology(String name) {
+    List <String> tmpList = new ArrayList<String>(1);
+    tmpList.add(name);
+    List <String> rNameList = dnsToSwitchMapping.resolve(tmpList);
+    if (rNameList == null || rNameList.size() == 0) {
+      return null;
+    }
+    String rName = rNameList.get(0);
+    String networkLoc = NodeBase.normalize(rName);
+    Node node = null;
+    if ((node = clusterMap.getNode(networkLoc+"/"+name)) == null) {
+      node = new NodeBase(name, networkLoc);
+      clusterMap.add(node);
+    }
+    return node;
+  }
+  public Node getNode(String name) {
+    return trackerNameToNodeMap.get(name);
+  }
+  public int getNumTaskCacheLevels() {
+    return numTaskCacheLevels;
+  }
+  public int getNumResolvedTaskTrackers() {
+    return numResolved;
+  }
   ////////////////////////////////////////////////////
   // InterTrackerProtocol
   ////////////////////////////////////////////////////
@@ -1172,8 +1230,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
       throw new DisallowedTaskTrackerException(status);
     }
 
-    // First check if the last heartbeat response got through 
+    // First check if the last heartbeat response got through
     String trackerName = status.getTrackerName();
+    
     HeartbeatResponse prevHeartbeatResponse =
       trackerToHeartbeatResponseMap.get(trackerName);
 
@@ -1346,15 +1405,76 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
 
         if (initialContact) {
           trackerExpiryQueue.add(trackerStatus);
+          resThread.addToResolutionQueue(trackerStatus);
         }
       }
     }
 
     updateTaskStatuses(trackerStatus);
-
+    
     return true;
   }
 
+  private class ResolutionThread extends Thread {
+    private LinkedBlockingQueue<TaskTrackerStatus> queue = 
+      new LinkedBlockingQueue <TaskTrackerStatus>();
+    public ResolutionThread() {
+      setName("DNSToSwitchMapping reolution Thread");
+      setDaemon(true);
+    }
+    public void addToResolutionQueue(TaskTrackerStatus t) {
+      while (!queue.add(t)) {
+        LOG.warn("Couldn't add to the Resolution queue now. Will " +
+                 "try again");
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException ie) {}
+      }
+    }
+    public void run() {
+      while (!isInterrupted()) {
+        try {
+          int size;
+          if((size = queue.size()) == 0) {
+            Thread.sleep(1000);
+            continue;
+          }
+          List <TaskTrackerStatus> statuses = 
+            new ArrayList<TaskTrackerStatus>(size);
+          queue.drainTo(statuses);
+          List<String> dnHosts = new ArrayList<String>(size);
+          for (TaskTrackerStatus t : statuses) {
+            dnHosts.add(t.getHost());
+          }
+          List<String> rName = dnsToSwitchMapping.resolve(dnHosts);
+          if (rName == null) {
+            continue;
+          }
+          int i = 0;
+          for (String m : rName) {
+            String host = statuses.get(i++).getHost();
+            String networkLoc = NodeBase.normalize(m);
+            Node node = null;
+            if (clusterMap.getNode(networkLoc+"/"+host) == null) {
+              node = new NodeBase(host, networkLoc);
+              clusterMap.add(node);
+              trackerNameToNodeMap.put(host, node);
+            }
+            numResolved++;
+          }
+        } catch (InterruptedException ie) {
+          LOG.warn(getName() + " exiting, got interrupted: " + 
+                   StringUtils.stringifyException(ie));
+          return;
+        } catch (Throwable t) {
+          LOG.error(getName() + " got an exception: " +
+              StringUtils.stringifyException(t));
+        }
+      }
+      LOG.warn(getName() + " exiting...");
+    }
+  }
+  
   /**
    * Returns a task we'd like the TaskTracker to execute right now.
    *

+ 12 - 0
src/java/org/apache/hadoop/mapred/Task.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.net.*;
 
 /** Base class for tasks. */
 abstract class Task implements Writable, Configurable {
@@ -405,6 +406,17 @@ abstract class Task implements Writable, Configurable {
     }
     this.mapOutputFile.setConf(this.conf);
     this.lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+    // add the static resolutions (this is required for the junit to
+    // work on testcases that simulate multiple nodes on a single physical
+    // node.
+    String hostToResolved[] = conf.getStrings("hadoop.net.static.resolutions");
+    if (hostToResolved != null) {
+      for (String str : hostToResolved) {
+        String name = str.substring(0, str.indexOf('='));
+        String resolvedName = str.substring(str.indexOf('=') + 1);
+        NetUtils.addStaticResolution(name, resolvedName);
+      }
+    }
   }
 
   public Configuration getConf() {

+ 14 - 7
src/java/org/apache/hadoop/mapred/TaskInProgress.java

@@ -31,6 +31,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapred.JobClient.RawSplit;
 
 
 /*************************************************************
@@ -62,8 +63,7 @@ class TaskInProgress {
 
   // Defines the TIP
   private String jobFile = null;
-  private String splitClass = null;
-  private BytesWritable split = null;
+  private RawSplit rawSplit;
   private int numMaps;
   private int partition;
   private JobTracker jobtracker;
@@ -115,17 +115,17 @@ class TaskInProgress {
   private TreeMap<String, Boolean> tasksToKill = new TreeMap<String, Boolean>();
   
   private Counters counters = new Counters();
+  
 
   /**
    * Constructor for MapTask
    */
   public TaskInProgress(String jobid, String jobFile, 
-                        String splitClass, BytesWritable split, 
+                        RawSplit rawSplit, 
                         JobTracker jobtracker, JobConf conf, 
                         JobInProgress job, int partition) {
     this.jobFile = jobFile;
-    this.splitClass = splitClass;
-    this.split = split;
+    this.rawSplit = rawSplit;
     this.jobtracker = jobtracker;
     this.job = job;
     this.conf = conf;
@@ -233,7 +233,7 @@ class TaskInProgress {
    * Whether this is a map task
    */
   public boolean isMapTask() {
-    return split != null;
+    return rawSplit != null;
   }
     
   /**
@@ -569,6 +569,13 @@ class TaskInProgress {
     
   }
 
+  /**
+   * Get the split locations 
+   */
+  public String[] getSplitLocations() {
+    return rawSplit.getLocations();
+  }
+  
   /**
    * Get the Status of the tasks managed by this TIP
    */
@@ -726,7 +733,7 @@ class TaskInProgress {
 
     if (isMapTask()) {
       t = new MapTask(jobId, jobFile, this.id, taskid, partition, 
-                      splitClass, split);
+                      rawSplit.getClassName(), rawSplit.getBytes());
     } else {
       t = new ReduceTask(jobId, jobFile, this.id, taskid, partition, numMaps);
     }

+ 19 - 1
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -389,10 +389,15 @@ public class TaskTracker
   synchronized void initialize() throws IOException {
     // use configured nameserver & interface to get local hostname
     this.fConf = new JobConf(originalConf);
-    this.localHostname =
+    if (fConf.get("slave.host.name") != null) {
+      this.localHostname = fConf.get("slave.host.name");
+    }
+    if (localHostname == null) {
+      this.localHostname =
       DNS.getDefaultHost
       (fConf.get("mapred.tasktracker.dns.interface","default"),
        fConf.get("mapred.tasktracker.dns.nameserver","default"));
+    }
  
     //check local disk
     checkLocalDirs(this.fConf.getLocalDirs());
@@ -1444,6 +1449,19 @@ public class TaskTracker
       }
       task.localizeConfiguration(localJobConf);
       
+      List<String[]> staticResolutions = NetUtils.getAllStaticResolutions();
+      if (staticResolutions != null && staticResolutions.size() > 0) {
+        StringBuffer str = new StringBuffer();
+
+        for (int i = 0; i < staticResolutions.size(); i++) {
+          String[] hostToResolved = staticResolutions.get(i);
+          str.append(hostToResolved[0]+"="+hostToResolved[1]);
+          if (i != staticResolutions.size() - 1) {
+            str.append(',');
+          }
+        }
+        localJobConf.set("hadoop.net.static.resolutions", str.toString());
+      }
       OutputStream out = localFs.create(localTaskFile);
       try {
         localJobConf.write(out);

+ 42 - 0
src/java/org/apache/hadoop/net/DNSToSwitchMapping.java

@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net;
+
+import java.util.List;
+
+/**
+ * An interface that should be implemented to allow pluggable 
+ * DNS-name/IP-address to RackID resolvers.
+ *
+ */
+public interface DNSToSwitchMapping {
+  /**
+   * Resolves a list of DNS-names/IP-addresses and returns back a list of
+   * switch information (network paths). One-to-one correspondence must be 
+   * maintained between the elements in the lists. 
+   * Consider an element in the argument list - x.y.com. The switch information
+   * that is returned must be a network path of the form /foo/rack, 
+   * where / is the root, and 'foo' is the switch where 'rack' is connected.
+   * Note the hostname/ip-address is not part of the returned path.
+   * The network topology of the cluster would determine the number of
+   * components in the network path.
+   * @param names
+   * @return list of resolved network paths
+   */
+  public List<String> resolve(List<String> names);
+}

+ 60 - 0
src/java/org/apache/hadoop/net/NetUtils.java

@@ -19,6 +19,9 @@ package org.apache.hadoop.net;
 
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.Map.Entry;
+import java.util.*;
+
 import javax.net.SocketFactory;
 
 import org.apache.commons.logging.Log;
@@ -30,6 +33,9 @@ import org.apache.hadoop.util.ReflectionUtils;
 
 public class NetUtils {
   private static final Log LOG = LogFactory.getLog(NetUtils.class);
+  
+  private static Map<String, String> hostToResolved = 
+                                     new HashMap<String, String>();
 
   /**
    * Get the socket factory for the given class according to its
@@ -122,6 +128,9 @@ public class NetUtils {
       port = addr.getPort();
     }
   
+    if (getStaticResolution(hostname) != null) {
+      hostname = getStaticResolution(hostname);
+    }
     return new InetSocketAddress(hostname, port);
   }
 
@@ -164,4 +173,55 @@ public class NetUtils {
     }
     return oldAddr + ":" + oldPort;
   }
+  
+  /**
+   * Adds a static resolution for host. This can be used for setting up
+   * hostnames with names that are fake to point to a well known host. For e.g.
+   * in some testcases we require to have daemons with different hostnames
+   * running on the same machine. In order to create connections to these
+   * daemons, one can set up mappings from those hostnames to "localhost".
+   * {@link NetUtils#getStaticResolution(String)} can be used to query for
+   * the actual hostname. 
+   * @param host
+   * @param resolvedName
+   */
+  public static void addStaticResolution(String host, String resolvedName) {
+    synchronized (hostToResolved) {
+      hostToResolved.put(host, resolvedName);
+    }
+  }
+  
+  /**
+   * Retrieves the resolved name for the passed host. The resolved name must
+   * have been set earlier using 
+   * {@link NetUtils#addStaticResolution(String, String)}
+   * @param host
+   * @return the resolution
+   */
+  public static String getStaticResolution(String host) {
+    synchronized (hostToResolved) {
+      return hostToResolved.get(host);
+    }
+  }
+  
+  /**
+   * This is used to get all the resolutions that were added using
+   * {@link NetUtils#addStaticResolution(String, String)}. The return
+   * value is a List each element of which contains an array of String 
+   * of the form String[0]=hostname, String[1]=resolved-hostname
+   * @return the list of resolutions
+   */
+  public static List <String[]> getAllStaticResolutions() {
+    synchronized (hostToResolved) {
+      Set <Entry <String, String>>entries = hostToResolved.entrySet();
+      if (entries.size() == 0) {
+        return null;
+      }
+      List <String[]> l = new ArrayList<String[]>(entries.size());
+      for (Entry<String, String> e : entries) {
+        l.add(new String[] {e.getKey(), e.getValue()});
+      }
+    return l;
+    }
+  }
 }

+ 12 - 5
src/java/org/apache/hadoop/net/NetworkTopology.java

@@ -38,6 +38,8 @@ import org.apache.commons.logging.LogFactory;
  */
 public class NetworkTopology {
   public final static String DEFAULT_RACK = "/default-rack";
+  public final static String UNRESOLVED = "";
+  public final static int DEFAULT_HOST_LEVEL = 2;
   public static final Log LOG = 
     LogFactory.getLog("org.apache.hadoop.net.NetworkTopology");
     
@@ -389,11 +391,16 @@ public class NetworkTopology {
    *          a path-like string representation of a node
    * @return a reference to the node; null if the node is not in the tree
    */
-  private Node getNode(String loc) {
-    loc = NodeBase.normalize(loc);
-    if (!NodeBase.ROOT.equals(loc))
-      loc = loc.substring(1);
-    return clusterMap.getLoc(loc);
+  public Node getNode(String loc) {
+    netlock.readLock().lock();
+    try {
+      loc = NodeBase.normalize(loc);
+      if (!NodeBase.ROOT.equals(loc))
+        loc = loc.substring(1);
+      return clusterMap.getLoc(loc);
+    } finally {
+      netlock.readLock().unlock();
+    }
   }
     
   /** Return the total number of racks */

+ 2 - 0
src/java/org/apache/hadoop/net/Node.java

@@ -30,6 +30,8 @@ package org.apache.hadoop.net;
 public interface Node {
   /** Return the string representation of this node's network location */
   public String getNetworkLocation();
+  /** Set the node's network location */
+  public void setNetworkLocation(String location);
   /** Return this node's name */
   public String getName();
   /** Return this node's parent */

+ 3 - 0
src/java/org/apache/hadoop/net/NodeBase.java

@@ -84,6 +84,9 @@ public class NodeBase implements Node {
   /** Return this node's network location */
   public String getNetworkLocation() { return location; }
   
+  /** Set this node's network location */
+  public void setNetworkLocation(String location) { this.location = location; }
+  
   /** Return this node's path */
   public static String getPath(Node node) {
     return node.getNetworkLocation()+PATH_SEPARATOR_STR+node.getName();

+ 139 - 0
src/java/org/apache/hadoop/net/ScriptBasedMapping.java

@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.net;
+
+import java.util.*;
+import java.io.*;
+import java.net.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.conf.*;
+
+/**
+ * This class implements the {@link DNSToSwitchMapping} interface using a 
+ * script configured via topology.script.file.name .
+ */
+public final class ScriptBasedMapping implements Configurable, 
+DNSToSwitchMapping
+{
+  private String scriptName;
+  private Configuration conf;
+  private int maxArgs; //max hostnames per call of the script
+  private Map<String, String> cache = new TreeMap<String, String>();
+  private static Log LOG = 
+    LogFactory.getLog("org.apache.hadoop.net.ScriptBasedMapping");
+  public void setConf (Configuration conf) {
+    this.scriptName = conf.get("topology.script.file.name");
+    this.maxArgs = conf.getInt("topology.script.number.args", 20);
+    this.conf = conf;
+  }
+  public Configuration getConf () {
+    return conf;
+  }
+
+  public ScriptBasedMapping() {}
+  
+  public List<String> resolve(List<String> names) {
+    List <String> m = new ArrayList<String>(names.size());
+    
+    if (scriptName == null) {
+      for (int i = 0; i < names.size(); i++) {
+        m.add(NetworkTopology.DEFAULT_RACK);
+      }
+      return m;
+    }
+    List<String> hosts = new ArrayList<String>(names.size());
+    for (String name : names) {
+      name = getHostName(name);
+      if (cache.get(name) == null) {
+        hosts.add(name);
+      } 
+    }
+    
+    int i = 0;
+    String output = runResolveCommand(hosts);
+    if (output != null) {
+      StringTokenizer allSwitchInfo = new StringTokenizer(output);
+      while (allSwitchInfo.hasMoreTokens()) {
+        String switchInfo = allSwitchInfo.nextToken();
+        cache.put(hosts.get(i++), switchInfo);
+      }
+    }
+    for (String name : names) {
+      //now everything is in the cache
+      name = getHostName(name);
+      if (cache.get(name) != null) {
+        m.add(cache.get(name));
+      } else { //resolve all or nothing
+        return null;
+      }
+    }
+    return m;
+  }
+  
+  private String runResolveCommand(List<String> args) {
+    InetAddress ipaddr = null;
+    int loopCount = 0;
+    if (args.size() == 0) {
+      return null;
+    }
+    StringBuffer allOutput = new StringBuffer();
+    int numProcessed = 0;
+    while (numProcessed != args.size()) {
+      int start = maxArgs * loopCount;
+      List <String> cmdList = new ArrayList<String>();
+      cmdList.add(scriptName);
+      for (numProcessed = start; numProcessed < (start + maxArgs) && 
+           numProcessed < args.size(); numProcessed++) {
+        try {
+          ipaddr = InetAddress.getByName(args.get(numProcessed));
+        } catch (UnknownHostException uh) {
+          return null;
+        }
+        cmdList.add(ipaddr.getHostAddress()); 
+      }
+      File dir = null;
+      String userDir;
+      if ((userDir = System.getProperty("user.dir")) != null) {
+        dir = new File(userDir);
+      }
+      ShellCommandExecutor s = new ShellCommandExecutor(
+                                   cmdList.toArray(new String[0]), dir);
+      try {
+        s.execute();
+        allOutput.append(s.getOutput() + " ");
+      } catch (Exception e) {
+        LOG.warn(StringUtils.stringifyException(e));
+        return null;
+      }
+      loopCount++; 
+    }
+    return allOutput.toString();
+  }
+  private String getHostName(String hostWithPort) {
+    int j;
+    if ((j = hostWithPort.indexOf(':')) != -1) {
+      hostWithPort = hostWithPort.substring(0, j);
+    }
+    return hostWithPort;
+  }
+}

+ 160 - 17
src/test/org/apache/hadoop/dfs/MiniDFSCluster.java

@@ -18,7 +18,11 @@
 package org.apache.hadoop.dfs;
 
 import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.BufferedOutputStream;
+import java.io.PipedOutputStream;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -26,6 +30,7 @@ import java.util.Collection;
 import javax.security.auth.login.LoginException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.*;
 import org.apache.hadoop.dfs.FSConstants.DatanodeReportType;
 import org.apache.hadoop.dfs.FSConstants.StartupOption;
 import org.apache.hadoop.fs.FileSystem;
@@ -55,11 +60,11 @@ public class MiniDFSCluster {
   private Configuration conf;
   private NameNode nameNode;
   private int numDataNodes;
-  private int curDatanodesNum = 0;
   private ArrayList<DataNodeProperties> dataNodes = 
                          new ArrayList<DataNodeProperties>();
   private File base_dir;
   private File data_dir;
+  private DNSToSwitchMapping dnsToSwitchMapping;
   
   
   /**
@@ -85,7 +90,7 @@ public class MiniDFSCluster {
   public MiniDFSCluster(Configuration conf,
                         int numDataNodes,
                         StartupOption nameNodeOperation) throws IOException {
-    this(0, conf, numDataNodes, false, false, nameNodeOperation, null);
+    this(0, conf, numDataNodes, false, false, nameNodeOperation, null, null, null);
   }
   
   /**
@@ -105,7 +110,28 @@ public class MiniDFSCluster {
                         int numDataNodes,
                         boolean format,
                         String[] racks) throws IOException {
-    this(0, conf, numDataNodes, format, true, null, racks);
+    this(0, conf, numDataNodes, format, true, null, racks, null, null);
+  }
+  
+  /**
+   * Modify the config and start up the servers.  The rpc and info ports for
+   * servers are guaranteed to use free ports.
+   * <p>
+   * NameNode and DataNode directory creation and configuration will be
+   * managed by this class.
+   *
+   * @param conf the base configuration to use in starting the servers.  This
+   *          will be modified as necessary.
+   * @param numDataNodes Number of DataNodes to start; may be zero
+   * @param format if true, format the NameNode and DataNodes before starting up
+   * @param racks array of strings indicating the rack that each DataNode is on
+   * @param hosts array of strings indicating the hostname for each DataNode
+   */
+  public MiniDFSCluster(Configuration conf,
+                        int numDataNodes,
+                        boolean format,
+                        String[] racks, String[] hosts) throws IOException {
+    this(0, conf, numDataNodes, format, true, null, racks, hosts, null);
   }
   
   /**
@@ -133,8 +159,8 @@ public class MiniDFSCluster {
                         boolean manageDfsDirs,
                         StartupOption operation,
                         String[] racks) throws IOException {
-    this(0, conf, numDataNodes, format, manageDfsDirs, operation, racks, null);
- 
+    this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, operation, 
+        racks, null, null);
   }
 
   /**
@@ -164,6 +190,38 @@ public class MiniDFSCluster {
                         StartupOption operation,
                         String[] racks,
                         long[] simulatedCapacities) throws IOException {
+    this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, operation, racks, null, 
+        simulatedCapacities);
+  }
+  
+  /**
+   * NOTE: if possible, the other constructors that don't have nameNode port 
+   * parameter should be used as they will ensure that the servers use free ports.
+   * <p>
+   * Modify the config and start up the servers.  
+   * 
+   * @param nameNodePort suggestion for which rpc port to use.  caller should
+   *          use getNameNodePort() to get the actual port used.
+   * @param conf the base configuration to use in starting the servers.  This
+   *          will be modified as necessary.
+   * @param numDataNodes Number of DataNodes to start; may be zero
+   * @param format if true, format the NameNode and DataNodes before starting up
+   * @param manageDfsDirs if true, the data directories for servers will be
+   *          created and dfs.name.dir and dfs.data.dir will be set in the conf
+   * @param operation the operation with which to start the servers.  If null
+   *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
+   * @param racks array of strings indicating the rack that each DataNode is on
+   * @param hosts array of strings indicating the hostnames of each DataNode
+   * @param simulatedCapacities array of capacities of the simulated data nodes
+   */
+  public MiniDFSCluster(int nameNodePort, 
+                        Configuration conf,
+                        int numDataNodes,
+                        boolean format,
+                        boolean manageDfsDirs,
+                        StartupOption operation,
+                        String[] racks, String hosts[],
+                        long[] simulatedCapacities) throws IOException {
     this.conf = conf;
     try {
       UserGroupInformation.setCurrentUGI(UnixUserGroupInformation.login(conf));
@@ -201,10 +259,12 @@ public class MiniDFSCluster {
                      operation == StartupOption.FORMAT ||
                      operation == StartupOption.REGULAR) ?
       new String[] {} : new String[] {"-"+operation.toString()};
+    conf.setClass("topology.node.switch.mapping.impl", 
+                   StaticMapping.class, DNSToSwitchMapping.class);
     nameNode = NameNode.createNameNode(args, conf);
     
     // Start the DataNodes
-    startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, simulatedCapacities);
+    startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, hosts, simulatedCapacities);
     
     if (numDataNodes > 0) {
       while (!isClusterUp()) {
@@ -235,15 +295,17 @@ public class MiniDFSCluster {
    * @param operation the operation with which to start the DataNodes.  If null
    *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
    * @param racks array of strings indicating the rack that each DataNode is on
+   * @param hosts array of strings indicating the hostnames for each DataNode
    * @param simulatedCapacities array of capacities of the simulated data nodes
    *
    * @throws IllegalStateException if NameNode has been shutdown
    */
   public synchronized void startDataNodes(Configuration conf, int numDataNodes, 
                              boolean manageDfsDirs, StartupOption operation, 
-                             String[] racks,
+                             String[] racks, String[] hosts,
                              long[] simulatedCapacities) throws IOException {
 
+    int curDatanodesNum = dataNodes.size();
     // for mincluster's the default initialDelay for BRs is 0
     if (conf.get("dfs.blockreport.initialDelay") == null) {
       conf.setLong("dfs.blockreport.initialDelay", 0);
@@ -262,6 +324,18 @@ public class MiniDFSCluster {
       throw new IllegalArgumentException( "The length of racks [" + racks.length
           + "] is less than the number of datanodes [" + numDataNodes + "].");
     }
+    if (hosts != null && numDataNodes > hosts.length ) {
+      throw new IllegalArgumentException( "The length of hosts [" + hosts.length
+          + "] is less than the number of datanodes [" + numDataNodes + "].");
+    }
+    //Generate some hostnames if required
+    if (racks != null && hosts == null) {
+      System.out.println("Generating host names for datanodes");
+      hosts = new String[numDataNodes];
+      for (int i = curDatanodesNum; i < curDatanodesNum + numDataNodes; i++) {
+        hosts[i - curDatanodesNum] = "host" + i + ".foo.com";
+      }
+    }
 
     if (simulatedCapacities != null 
         && numDataNodes > simulatedCapacities.length) {
@@ -271,8 +345,8 @@ public class MiniDFSCluster {
     }
 
     // Set up the right ports for the datanodes
-    conf.set("dfs.datanode.address", "0.0.0.0:0");
-    conf.set("dfs.datanode.http.address", "0.0.0.0:0");
+    conf.set("dfs.datanode.address", "127.0.0.1:0");
+    conf.set("dfs.datanode.http.address", "127.0.0.1:0");
     
     String[] args = (operation == null ||
                      operation == StartupOption.FORMAT ||
@@ -293,9 +367,6 @@ public class MiniDFSCluster {
         }
         dnConf.set("dfs.data.dir", dir1.getPath() + "," + dir2.getPath()); 
       }
-      if (racks != null) {
-        dnConf.set("dfs.datanode.rack", racks[i-curDatanodesNum]);
-      }
       if (simulatedCapacities != null) {
         dnConf.setBoolean("dfs.datanode.simulateddatastorage", true);
         dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY,
@@ -303,13 +374,38 @@ public class MiniDFSCluster {
       }
       System.out.println("Starting DataNode " + i + " with dfs.data.dir: " 
                          + dnConf.get("dfs.data.dir"));
+      if (hosts != null) {
+        dnConf.set("slave.host.name", hosts[i - curDatanodesNum]);
+        System.out.println("Starting DataNode " + i + " with hostname set to: " 
+                           + dnConf.get("slave.host.name"));
+      }
+      if (racks != null) {
+        String name = hosts[i - curDatanodesNum];
+        System.out.println("Adding node with hostname : " + name + " to rack "+
+                            racks[i-curDatanodesNum]);
+        StaticMapping.addNodeToRack(name, racks[i-curDatanodesNum]);
+      }
       Configuration newconf = new Configuration(dnConf); // save config
-      dataNodes.add(new DataNodeProperties(
-                     DataNode.createDataNode(dnArgs, dnConf), 
-                     newconf, dnArgs));
+      if (hosts != null) {
+        NetUtils.addStaticResolution(hosts[i - curDatanodesNum], "localhost");
+      }
+      DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf);
+      //since the HDFS does things based on IP:port, we need to add the mapping
+      //for IP:port to rackId
+      String ipAddr = dn.getSelfAddr().getAddress().getHostAddress();
+      if (racks != null) {
+        int port = dn.getSelfAddr().getPort();
+        System.out.println("Adding node with IP:port : " + ipAddr + ":" + port+
+                            " to rack " + racks[i-curDatanodesNum]);
+        StaticMapping.addNodeToRack(ipAddr + ":" + port,
+                                  racks[i-curDatanodesNum]);
+      }
+      DataNode.runDatanodeDaemon(dn);
+      dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs));
     }
     curDatanodesNum += numDataNodes;
     this.numDataNodes += numDataNodes;
+    waitActive();
   }
   
   
@@ -334,9 +430,39 @@ public class MiniDFSCluster {
       boolean manageDfsDirs, StartupOption operation, 
       String[] racks
       ) throws IOException {
-    startDataNodes( conf,  numDataNodes, manageDfsDirs,  operation, racks, null);
+    startDataNodes( conf,  numDataNodes, manageDfsDirs,  operation, racks, null, null);
   }
   
+  /**
+   * Modify the config and start up additional DataNodes.  The info port for
+   * DataNodes is guaranteed to use a free port.
+   *  
+   *  Data nodes can run with the name node in the mini cluster or
+   *  a real name node. For example, running with a real name node is useful
+   *  when running simulated data nodes with a real name node.
+   *  If minicluster's name node is null assume that the conf has been
+   *  set with the right address:port of the name node.
+   *
+   * @param conf the base configuration to use in starting the DataNodes.  This
+   *          will be modified as necessary.
+   * @param numDataNodes Number of DataNodes to start; may be zero
+   * @param manageDfsDirs if true, the data directories for DataNodes will be
+   *          created and dfs.data.dir will be set in the conf
+   * @param operation the operation with which to start the DataNodes.  If null
+   *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
+   * @param racks array of strings indicating the rack that each DataNode is on
+   * @param simulatedCapacities array of capacities of the simulated data nodes
+   *
+   * @throws IllegalStateException if NameNode has been shutdown
+   */
+  public void startDataNodes(Configuration conf, int numDataNodes, 
+                             boolean manageDfsDirs, StartupOption operation, 
+                             String[] racks,
+                             long[] simulatedCapacities) throws IOException {
+    startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, null,
+                   simulatedCapacities);
+    
+  }
   /**
    * If the NameNode is running, attempt to finalize a previous upgrade.
    * When this method return, the NameNode should be finalized, but
@@ -512,15 +638,32 @@ public class MiniDFSCluster {
     InetSocketAddress addr = new InetSocketAddress("localhost",
                                                    getNameNodePort());
     DFSClient client = new DFSClient(addr, conf);
+    DatanodeInfo[] dnInfos;
 
     // make sure all datanodes are alive
-    while( client.datanodeReport(DatanodeReportType.LIVE).length
+    while((dnInfos = client.datanodeReport(DatanodeReportType.LIVE)).length
         != numDataNodes) {
       try {
         Thread.sleep(500);
       } catch (Exception e) {
       }
     }
+    int numResolved = 0;
+    do {
+      numResolved = 0;
+      for (DatanodeInfo info : dnInfos) {
+        if (!info.getNetworkLocation().equals(NetworkTopology.UNRESOLVED)) {
+          numResolved++;
+        } else {
+          try {
+            Thread.sleep(500);
+          } catch (Exception e) {
+          }
+          dnInfos = client.datanodeReport(DatanodeReportType.LIVE);
+          break;
+        }
+      }
+    } while (numResolved != numDataNodes);
 
     client.close();
   }

+ 20 - 4
src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java

@@ -60,7 +60,7 @@ import org.apache.log4j.Level;
  * Then the benchmark executes the specified number of operations using 
  * the specified number of threads and outputs the resulting stats.
  */
-public class NNThroughputBenchmark {
+public class NNThroughputBenchmark implements FSConstants {
   private static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.NNThroughputBenchmark");
   private static final int BLOCK_SIZE = 16;
 
@@ -568,10 +568,8 @@ public class NNThroughputBenchmark {
       NamespaceInfo nsInfo = nameNode.versionRequest();
       dnRegistration.setStorageInfo(new DataStorage(nsInfo, ""));
       DataNode.setNewStorageID(dnRegistration);
-      // get network location
-      String networkLoc = NetworkTopology.DEFAULT_RACK;
       // register datanode
-      dnRegistration = nameNode.register(dnRegistration, networkLoc);
+      dnRegistration = nameNode.register(dnRegistration);
     }
 
     void sendHeartbeat() throws IOException {
@@ -677,6 +675,24 @@ public class NNThroughputBenchmark {
         datanodes[idx].sendHeartbeat();
         prevDNName = datanodes[idx].dnRegistration.getName();
       }
+      int numResolved = 0;
+      DatanodeInfo[] dnInfos = nameNode.getDatanodeReport(DatanodeReportType.ALL);
+      do {
+        numResolved = 0;
+        for (DatanodeInfo info : dnInfos) {
+          if (!info.getNetworkLocation().equals(NetworkTopology.UNRESOLVED)) {
+            numResolved++;
+          } else {
+            try {
+              Thread.sleep(2);
+            } catch (Exception e) {
+            }
+            dnInfos = nameNode.getDatanodeReport(DatanodeReportType.LIVE);
+            break;
+          }
+        }
+      } while (numResolved != nrDatanodes);
+
       // create files 
       FileGenerator nameGenerator;
       nameGenerator = new FileGenerator(getBaseDir(), 100);

+ 1 - 1
src/test/org/apache/hadoop/dfs/TestDatanodeBlockScanner.java

@@ -65,7 +65,7 @@ public class TestDatanodeBlockScanner extends TestCase {
    */
   private static long waitForVerification(DatanodeInfo dn, FileSystem fs, 
                                           Path file) throws IOException {
-    URL url = new URL("http://" + dn.getHostName() + ":" + dn.getInfoPort() +
+    URL url = new URL("http://localhost:" + dn.getInfoPort() +
                       "/blockScannerReport?listblocks");
     long lastWarnTime = System.currentTimeMillis();
     long verificationTime = 0;

+ 95 - 8
src/test/org/apache/hadoop/mapred/MiniMRCluster.java

@@ -22,6 +22,9 @@ import java.util.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.StaticMapping;
+import org.apache.hadoop.net.NetUtils;
 
 /**
  * This class creates a single-process Map-Reduce cluster for junit testing.
@@ -48,7 +51,7 @@ public class MiniMRCluster {
    */
   class JobTrackerRunner implements Runnable {
     private JobTracker tracker = null;
-
+    
     JobConf jc = null;
         
     public boolean isUp() {
@@ -70,6 +73,8 @@ public class MiniMRCluster {
       try {
         jc = createJobConf();
         jc.set("mapred.local.dir","build/test/mapred/local");
+        jc.setClass("topology.node.switch.mapping.impl", 
+            StaticMapping.class, DNSToSwitchMapping.class);
         tracker = JobTracker.startTracker(jc);
         tracker.offerService();
       } catch (Throwable e) {
@@ -104,11 +109,15 @@ public class MiniMRCluster {
     volatile boolean isDead = false;
     int numDir;
 
-    TaskTrackerRunner(int trackerId, int numDir) throws IOException {
+    TaskTrackerRunner(int trackerId, int numDir, String hostname) 
+    throws IOException {
       this.trackerId = trackerId;
       this.numDir = numDir;
       localDirs = new String[numDir];
       conf = createJobConf();
+      if (hostname != null) {
+        conf.set("slave.host.name", hostname);
+      }
       conf.set("mapred.task.tracker.http.address", "0.0.0.0:0");
       conf.set("mapred.task.tracker.report.address", 
                 "127.0.0.1:" + taskTrackerPort);
@@ -132,6 +141,14 @@ public class MiniMRCluster {
       }
       conf.set("mapred.local.dir", localPath.toString());
       LOG.info("mapred.local.dir is " +  localPath);
+      try {
+        tt = new TaskTracker(conf);
+        isInitialized = true;
+      } catch (Throwable e) {
+        isDead = true;
+        tt = null;
+        LOG.error("task tracker " + trackerId + " crashed", e);
+      }
     }
         
     /**
@@ -139,9 +156,9 @@ public class MiniMRCluster {
      */
     public void run() {
       try {
-        tt = new TaskTracker(conf);
-        isInitialized = true;
-        tt.run();
+        if (tt != null) {
+          tt.run();
+        }
       } catch (Throwable e) {
         isDead = true;
         tt = null;
@@ -162,6 +179,11 @@ public class MiniMRCluster {
     public String[] getLocalDirs(){
       return localDirs;
     } 
+    
+    public TaskTracker getTaskTracker() {
+      return tt;
+    }
+    
     /**
      * Shut down the server and wait for it to finish.
      */
@@ -226,12 +248,24 @@ public class MiniMRCluster {
     result.set("fs.default.name", namenode);
     result.set("mapred.job.tracker", "localhost:"+jobTrackerPort);
     result.set("mapred.job.tracker.http.address", 
-                        "0.0.0.0:" + jobTrackerInfoPort);
+                        "127.0.0.1:" + jobTrackerInfoPort);
     // for debugging have all task output sent to the test output
     JobClient.setTaskOutputFilter(result, JobClient.TaskStatusFilter.ALL);
     return result;
   }
 
+  /**
+   * Create the config and the cluster.
+   * @param numTaskTrackers no. of tasktrackers in the cluster
+   * @param namenode the namenode
+   * @param numDir no. of directories
+   * @throws IOException
+   */
+  public MiniMRCluster(int numTaskTrackers, String namenode, int numDir, 
+      String[] racks, String[] hosts) throws IOException {
+    this(0, 0, numTaskTrackers, namenode, false, numDir, racks, hosts);
+  }
+  
   /**
    * Create the config and the cluster.
    * @param numTaskTrackers no. of tasktrackers in the cluster
@@ -259,12 +293,50 @@ public class MiniMRCluster {
          taskTrackerFirst, 1);
   } 
 
+  public MiniMRCluster(int jobTrackerPort,
+      int taskTrackerPort,
+      int numTaskTrackers,
+      String namenode,
+      boolean taskTrackerFirst, int numDir)
+  throws IOException {
+    this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, 
+        taskTrackerFirst, 1, null);
+  }
+  
+  public MiniMRCluster(int jobTrackerPort,
+      int taskTrackerPort,
+      int numTaskTrackers,
+      String namenode,
+      boolean taskTrackerFirst, int numDir,
+      String[] racks) throws IOException {
+    this(jobTrackerPort, taskTrackerPort, numTaskTrackers, namenode, 
+        taskTrackerFirst, numDir, racks, null);
+  }
+  
   public MiniMRCluster(int jobTrackerPort,
                        int taskTrackerPort,
                        int numTaskTrackers,
                        String namenode,
-                       boolean taskTrackerFirst, int numDir) throws IOException {
+                       boolean taskTrackerFirst, int numDir,
+                       String[] racks, String[] hosts) throws IOException {
 
+    if (racks != null && racks.length < numTaskTrackers) {
+      LOG.error("Invalid number of racks specified. It should be at least " +
+          "equal to the number of tasktrackers");
+      shutdown();
+    }
+    if (hosts != null && numTaskTrackers > hosts.length ) {
+      throw new IllegalArgumentException( "The length of hosts [" + hosts.length
+          + "] is less than the number of tasktrackers [" + numTaskTrackers + "].");
+    }
+    //Generate some hostnames if required
+    if (racks != null && hosts == null) {
+      System.out.println("Generating host names for tasktrackers");
+      hosts = new String[numTaskTrackers];
+      for (int i = 0; i < numTaskTrackers; i++) {
+        hosts[i] = "host" + i + ".foo.com";
+      }
+    }
     this.jobTrackerPort = jobTrackerPort;
     this.taskTrackerPort = taskTrackerPort;
     this.jobTrackerInfoPort = 0;
@@ -290,7 +362,16 @@ public class MiniMRCluster {
 
     // Create the TaskTrackers
     for (int idx = 0; idx < numTaskTrackers; idx++) {
-      TaskTrackerRunner taskTracker = new TaskTrackerRunner(idx, numDir);
+      if (racks != null) {
+        StaticMapping.addNodeToRack(hosts[idx],racks[idx]);
+      }
+      if (hosts != null) {
+        NetUtils.addStaticResolution(hosts[idx], "localhost");
+      }
+      TaskTrackerRunner taskTracker;
+      taskTracker = new TaskTrackerRunner(idx, numDir, 
+          hosts == null ? null : hosts[idx]);
+      
       Thread taskTrackerThread = new Thread(taskTracker);
       taskTrackerList.add(taskTracker);
       taskTrackerThreadList.add(taskTrackerThread);
@@ -311,6 +392,12 @@ public class MiniMRCluster {
     }
 
     // Wait till the MR cluster stabilizes
+    while(jobTracker.tracker.getNumResolvedTaskTrackers() != numTaskTrackers) {
+      try {
+        Thread.sleep(20);
+      } catch (InterruptedException ie) {
+      }
+    }
     waitUntilIdle();
   }
     

+ 191 - 0
src/test/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java

@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+public class TestRackAwareTaskPlacement extends TestCase {
+  private static final String rack1[] = new String[] {
+    "/r1"
+  };
+  private static final String hosts1[] = new String[] {
+    "host1.rack1.com"
+  };
+  private static final String rack2[] = new String[] {
+    "/r2", "/r2"
+  };
+  private static final String hosts2[] = new String[] {
+    "host1.rack2.com", "host2.rack2.com"
+  };
+  private static final String hosts3[] = new String[] {
+    "host3.rack1.com"
+  };
+  private static final String hosts4[] = new String[] {
+    "host1.rack2.com"
+  };
+  final Path inDir = new Path("/racktesting");
+  final Path outputPath = new Path("/output");
+  
+  public void testTaskPlacement() throws IOException {
+    String namenode = null;
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+    try {
+      final int taskTrackers = 1;
+
+      /* Start 3 datanodes, one in rack r1, and two in r2. Create three
+       * files (splits).
+       * 1) file1, just after starting the datanode on r1, with 
+       *    a repl factor of 1, and,
+       * 2) file2 & file3 after starting the other two datanodes, with a repl 
+       *    factor of 3.
+       * At the end, file1 will be present on only datanode1, and, file2 and 
+       * file3, will be present on all datanodes. 
+       */
+      Configuration conf = new Configuration();
+      conf.setBoolean("dfs.replication.considerLoad", false);
+      dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
+      dfs.waitActive();
+      fileSys = dfs.getFileSystem();
+      if (!fileSys.mkdirs(inDir)) {
+        throw new IOException("Mkdirs failed to create " + inDir.toString());
+      }
+      writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file1"), (short)1);
+      dfs.startDataNodes(conf, 2, true, null, rack2, hosts2, null);
+      dfs.waitActive();
+
+      writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file2"), (short)3);
+      writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file3"), (short)3);
+      
+      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" + 
+                 (dfs.getFileSystem()).getUri().getPort(); 
+      /* Run a job with the (only)tasktracker on rack2. The rack location
+       * of the tasktracker will determine how many data/rack local maps it
+       * runs. The hostname of the tasktracker is set to same as one of the 
+       * datanodes.
+       */
+      mr = new MiniMRCluster(taskTrackers, namenode, 1, rack2, hosts4);
+      JobConf jobConf = mr.createJobConf();
+      if (fileSys.exists(outputPath)) {
+        fileSys.delete(outputPath);
+      }
+      /* The job is configured with three maps since there are three 
+       * (non-splittable) files. On rack2, there are two files and both
+       * have repl of three. The blocks for those files must therefore be
+       * present on all the datanodes, in particular, the datanodes on rack2.
+       * The third input file is pulled from rack1.
+       */
+      RunningJob job = launchJob(jobConf, 3);
+      Counters counters = job.getCounters();
+      assertEquals("Number of Data-local maps", 
+          counters.getCounter(JobInProgress.Counter.DATA_LOCAL_MAPS), 2);
+      assertEquals("Number of Rack-local maps", 
+          counters.getCounter(JobInProgress.Counter.RACK_LOCAL_MAPS), 0);
+      mr.waitUntilIdle();
+      
+      /* Run a job with the (only)tasktracker on rack1.
+       */
+      mr = new MiniMRCluster(taskTrackers, namenode, 1, rack1, hosts3);
+      jobConf = mr.createJobConf();
+      if (fileSys.exists(outputPath)) {
+        fileSys.delete(outputPath);
+      }
+      /* The job is configured with three maps since there are three 
+       * (non-splittable) files. On rack1, because of the way in which repl
+       * was setup while creating the files, we will have all the three files. 
+       * Thus, a tasktracker will find all inputs in this rack.
+       */
+      job = launchJob(jobConf, 3);
+      counters = job.getCounters();
+      assertEquals("Number of Rack-local maps",
+          counters.getCounter(JobInProgress.Counter.RACK_LOCAL_MAPS), 3);
+      mr.waitUntilIdle();
+      
+    } finally {
+      if (fileSys != null) { 
+        fileSys.close(); 
+      }
+      if (dfs != null) { 
+        dfs.shutdown(); 
+      }
+      if (mr != null) { 
+        mr.shutdown();
+      }
+    }
+  }
+  private void writeFile(NameNode namenode, Configuration conf, Path name, 
+      short replication) throws IOException {
+    FileSystem fileSys = FileSystem.get(conf);
+    SequenceFile.Writer writer = 
+      SequenceFile.createWriter(fileSys, conf, name, 
+                                BytesWritable.class, BytesWritable.class,
+                                CompressionType.NONE);
+    writer.append(new BytesWritable(), new BytesWritable());
+    writer.close();
+    fileSys.setReplication(name, replication);
+    waitForReplication(fileSys, namenode, name, replication);
+  }
+  private void waitForReplication(FileSystem fileSys, NameNode namenode, 
+      Path name, short replication) throws IOException {
+    //wait for the replication to happen
+    boolean isReplicationDone;
+    
+    do {
+      String[][] hints = fileSys.getFileCacheHints(name, 0, Long.MAX_VALUE);
+      if (hints[0].length == replication) {
+        isReplicationDone = true;
+      } else {
+        isReplicationDone = false;  
+      }
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ie) {
+        return;
+      }
+    } while(!isReplicationDone);
+  }
+  private RunningJob launchJob(JobConf jobConf, int numMaps) throws IOException {
+    jobConf.setJobName("TestForRackAwareness");
+    jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
+    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+    jobConf.setInputPath(inDir);
+    jobConf.setOutputPath(outputPath);
+    jobConf.setMapperClass(IdentityMapper.class);
+    jobConf.setReducerClass(IdentityReducer.class);
+    jobConf.setOutputKeyClass(BytesWritable.class);
+    jobConf.setOutputValueClass(BytesWritable.class);
+    jobConf.setNumMapTasks(numMaps);
+    jobConf.setNumReduceTasks(0);
+    jobConf.setJar("build/test/testjar/testjob.jar");
+    return JobClient.runJob(jobConf);
+  }
+}

+ 62 - 0
src/test/org/apache/hadoop/net/StaticMapping.java

@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.net;
+
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+
+/**
+ * Implements the {@link DNSToSwitchMapping} via static mappings. Used
+ * in testcases that simulate racks.
+ *
+ */
+public class StaticMapping extends Configured implements DNSToSwitchMapping {
+  public void setconf(Configuration conf) {
+    String[] mappings = conf.getStrings("hadoop.configured.node.mapping");
+    if (mappings != null) {
+      for (int i = 0; i < mappings.length; i++) {
+        String str = mappings[i];
+        String host = str.substring(0, str.indexOf('='));
+        String rack = str.substring(str.indexOf('=') + 1);
+        addNodeToRack(host, rack);
+      }
+    }
+  }
+  /* Only one instance per JVM */
+  private static Map<String, String> nameToRackMap = new HashMap<String, String>();
+  
+  static synchronized public void addNodeToRack(String name, String rackId) {
+    nameToRackMap.put(name, rackId);
+  }
+  public List<String> resolve(List<String> names) {
+    List<String> m = new ArrayList<String>();
+    synchronized (nameToRackMap) {
+      for (String name : names) {
+        String rackId;
+        if ((rackId = nameToRackMap.get(name)) != null) {
+          m.add(rackId);
+        } else {
+          m.add(NetworkTopology.DEFAULT_RACK);
+        }
+      }
+      return m;
+    }
+  }
+}

이 변경점에서 너무 많은 파일들이 변경되어 몇몇 파일들은 표시되지 않았습니다.