ソースを参照

Merge -r 814222:815964 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-265@816022 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze 16 年 前
コミット
fe729c7b09
29 ファイル変更1800 行追加696 行削除
  1. 14 0
      CHANGES.txt
  2. 42 46
      src/contrib/hdfsproxy/README
  3. 601 0
      src/docs/src/documentation/content/xdocs/hdfsproxy.xml
  4. 1 0
      src/docs/src/documentation/content/xdocs/site.xml
  5. BIN
      src/docs/src/documentation/resources/images/hdfsproxy-forward.jpg
  6. BIN
      src/docs/src/documentation/resources/images/hdfsproxy-overview.jpg
  7. BIN
      src/docs/src/documentation/resources/images/hdfsproxy-server.jpg
  8. BIN
      src/docs/src/documentation/resources/images/request-identify.jpg
  9. 33 9
      src/java/org/apache/hadoop/hdfs/DFSClient.java
  10. 21 1
      src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  11. 7 3
      src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
  12. 20 3
      src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
  13. 5 6
      src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
  14. 12 9
      src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
  15. 172 0
      src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
  16. 504 0
      src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
  17. 36 0
      src/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
  18. 17 0
      src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  19. 35 0
      src/java/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java
  20. 44 22
      src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  21. 7 1
      src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
  22. 4 3
      src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
  23. 4 4
      src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
  24. 0 514
      src/java/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java
  25. 2 2
      src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
  26. 44 0
      src/test/hdfs/org/apache/hadoop/hdfs/TestDFSMkdirs.java
  27. 95 0
      src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
  28. 2 2
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
  29. 78 71
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java

+ 14 - 0
CHANGES.txt

@@ -88,6 +88,9 @@ Trunk (unreleased changes)
     HDFS-235. Add support for byte ranges in HftpFileSystem to serve
     range of bytes from a file. (Bill Zeller via suresh)
 
+    HDFS-385. Add support for an experimental API that allows a module external
+    to HDFS to specify how HDFS blocks should be placed. (dhruba)
+
   IMPROVEMENTS
 
     HDFS-381. Remove blocks from DataNode maps when corresponding file
@@ -200,6 +203,13 @@ Trunk (unreleased changes)
     HDFS-412. Hadoop JMX usage makes Nagios monitoring impossible.
     (Brian Bockelman via tomwhite)
 
+    HDFS-472. Update hdfsproxy documentation. Adds a setup guide and design
+    document. (Zhiyong Zhang via cdouglas)
+
+    HDFS-617. Support non-recursive create().  (Kan Zhang via szetszwo)
+
+    HDFS-618. Support non-recursive mkdir().  (Kan Zhang via szetszwo)
+
   BUG FIXES
 
     HDFS-76. Better error message to users when commands fail because of 
@@ -286,6 +296,10 @@ Trunk (unreleased changes)
     HDFS-614. TestDatanodeBlockScanner obtains data directories directly from
     MiniHDFSCluster. (shv)
 
+    HDFS-612. Remove the use of org.mortbay.log.Log in FSDataset.  (szetszwo)
+
+    HDFS-622. checkMinReplication should count live nodes only. (shv)
+
 Release 0.20.1 - Unreleased
 
   IMPROVEMENTS

+ 42 - 46
src/contrib/hdfsproxy/README

@@ -1,51 +1,47 @@
-HDFSPROXY is an HTTPS proxy server that exposes the same HSFTP interface as a 
-real cluster. It authenticates users via user certificates and enforce access 
-control based on configuration files.
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
 
-Starting up an HDFSPROXY server is similar to starting up an HDFS cluster. 
-Simply run "hdfsproxy" shell command. The main configuration file is 
-hdfsproxy-default.xml, which should be on the classpath. hdfsproxy-env.sh 
-can be used to set up environmental variables. In particular, JAVA_HOME should 
-be set. Additional configuration files include user-certs.xml, 
-user-permissions.xml and ssl-server.xml, which are used to specify allowed user
-certs, allowed directories/files, and ssl keystore information for the proxy, 
-respectively. The location of these files can be specified in 
-hdfsproxy-default.xml. Environmental variable HDFSPROXY_CONF_DIR can be used to
-point to the directory where these configuration files are located. The 
-configuration files of the proxied HDFS cluster should also be available on the
-classpath (hdfs-default.xml and hdfs-site.xml).
+HDFS Proxy is a proxy server through which a hadoop client (through HSFTP) or a standard
+HTTPS client (wget, curl, etc) can talk to a hadoop server and more importantly pull data
+from the sever. It put an access control layer in front of hadoop namenode server and extends
+its functionalities to allow hadoop cross-version data transfer.
 
-Mirroring those used in HDFS, a few shell scripts are provided to start and 
-stop a group of proxy servers. The hosts to run hdfsproxy on are specified in 
-hdfsproxy-hosts file, one host per line. All hdfsproxy servers are stateless 
-and run independently from each other. Simple load balancing can be set up by 
-mapping all hdfsproxy server IP addresses to a single hostname. Users should 
-use that hostname to access the proxy. If an IP address look up for that 
-hostname returns more than one IP addresses, an HFTP/HSFTP client will randomly
-pick one to use.
+HDFSPROXY can be configured/started via either Jetty or Tomcat with different supporting features.
 
-Command "hdfsproxy -reloadPermFiles" can be used to trigger reloading of 
-user-certs.xml and user-permissions.xml files on all proxy servers listed in 
-the hdfsproxy-hosts file. Similarly, "hdfsproxy -clearUgiCache" command can be 
-used to clear the UGI caches on all proxy servers.
+A) With Jetty-based Installation, supporting features include:
+> Single Hadoop source cluster data transfer
+> Single Hadoop version data transfer
+> Authenticate users via user SSL certificates with ProxyFilter installed
+> Enforce access control based on configuration files.
 
-For tomcat based installation.
-1. set up the environment and configuration files. 
-	 a) export HADOOP_CONF_DIR=${user.home}/devel/source-conf
-	 	source-conf directory should point to the source cluster's configuration directory, 
-	 	where core-site.xml, and hdfs-site.xml should already be correctly configured for 
-	 	the source cluster settings.
-	 b) export HDFSPROXY_CONF_DIR=${user.home}/devel/proxy-conf
-	  proxy-conf directory should point to the proxy's configuration directory, where 
-	  hdfsproxy-default.xml, etc, should already be properly configured.
+B) With Tomcat-based Installation, supporting features include:
+> Multiple Hadoop source cluster data transfer
+> Multiple Hadoop version data transfer
+> Authenticate users via user SSL certificates with ProxyFilter installed
+> Authentication and authorization via LDAP with LdapIpDirFilter installed
+> Access control based on configuration files if ProxyFilter is installed.
+> Access control based on LDAP entries if LdapIpDirFilter is installed.
+> Standard HTTPS Get Support for file transfer
 
-2. cd ==> hdfsproxy directory,  ant war
-	 
-3. download and install tomcat6, change tomcat conf/server.xml file to include https support. 
-	 uncomment item below SSL HTTP/1.1 Connector and add paths, resulting something look like this:
-	 <Connector port="8443" protocol="HTTP/1.1" SSLEnabled="true"
-               maxThreads="150" scheme="https" secure="true" keystoreFile="${user.home}/grid/hdfsproxy-conf/server2.keystore" 
-               keystorePass="changeme" keystoreType="JKS"  clientAuth="true" sslProtocol="TLS" />
-4. copy war file in step 2 to tomcat's webapps directory and rename it to ROOT.war
-5. export JAVA_OPTS="-Djavax.net.ssl.trustStore=${user.home}/grid/hdfsproxy-conf/server2.keystore -Djavax.net.ssl.trustStorePassword=changeme"
-6. start up tomcat with tomcat's bin/startup.sh 
+The detailed configuration/set-up guide is in the Forrest 
+documentation, which can be found at $HADOOP_HDFS_HOME/docs. In order to build the 
+documentation on your own from source please use the following command in 
+the downloaded source folder:
+
+ant docs -Dforrest.home=path to forrest -Djava5.home= path to jdk5. 
+
+The documentation so built would be under $HADOOP_HDFS_HOME/build/docs

+ 601 - 0
src/docs/src/documentation/content/xdocs/hdfsproxy.xml

@@ -0,0 +1,601 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN" "http://forrest.apache.org/dtd/document-v20.dtd">
+
+
+<document>
+
+  <header>
+    <title> HDFS Proxy Guide</title>
+  </header>
+
+  <body>
+    <section>
+      <title> Introduction </title>
+      <p> HDFS Proxy is a proxy server through which a hadoop client (through HSFTP) or a standard
+        HTTPS client (wget, curl, etc) can talk to a hadoop server and more importantly pull data from
+        the sever. It put an access control layer in front of hadoop namenode server and
+        extends its functionalities to allow hadoop cross-version data transfer. </p>     
+    </section>
+
+    <section>
+      <title> Goals and Use Cases </title>
+      <section>
+        <title> Data Transfer from HDFS clusters </title>
+        <ul>
+          <li>User uses HSFTP protocol (hadoop distcp/fs, etc) to access HDFS proxy to copy out data stored on one or more HDFS clusters.</li>
+          <li>User uses HTTPS protocol (curl, wget, etc) to access HDFS proxy to copy out data stored on one or more HDFS clusters </li>
+        </ul>
+      </section>
+      
+      <section>
+        <title> Cross-version Data Transfer </title>
+        <p>There are multiple HDFS clusters and possibly in different hadoop versions, each holding
+          different data. A client need to access these data in a standard way without worrying about
+          version compatibility issues. </p>
+      </section>
+      
+      <section>
+        <title> User Access Control </title>
+        <ul>
+          <li>User Access Control through SSL certificates</li>
+          <li>User Access Control through LDAP (Lightweight Directory Access Protocol) server</li>
+        </ul>
+      </section>
+      
+    </section>
+    
+    <section>
+      <title> Comparison with NameNode's H(S)FTP Interface </title>
+      <p>NameNode has a http listener started at <code>dfs.http.address</code> with default port 50070 when NameNode is started and it provided a HFTP interface for the client. Also it could have a https listener started at <code>dfs.https.address</code> if <code>dfs.https.enable</code> is defined as true (by default, <code>dfs.https.enable</code> is not defined) to provide a HSFTP interface for client.</p>
+      <section>
+        <title>Advantages of Proxy Over NameNode HTTP(S) server</title>
+        <ol>
+          <li>We can centralize access control layer to the proxy part so that NameNode server can lower its burden. In this sense, HDFS proxy plays a filtering role to control data access to NameNode and DataNodes. It is especially useful if the HDFS system has some sensitive data in it. 
+          </li>
+          <li> After modulizing HDFS proxy into a standalone package, we can decouple the complexity of HDFS system and expand the proxy functionalities without worring about affecting other HDFS system features.
+          </li>
+        </ol>
+      </section>
+      <section>
+        <title>Disadvantages of Using Proxy Instead of Getting Data Directly from H(S)FTP Interface: Slower in speed. This is due to</title>
+        <ol>
+          <li>HDFS proxy need to first copy data from source cluster, then transfer the data out to the client.</li>
+          <li> Unlike H(S)FTP interface, where file status listing, etc., is through NameNode server, and real data transfer is redirected to the real DataNode server, all data transfer under HDFS proxy is through the proxy server.</li>
+        </ol>        
+      </section>
+
+    </section>
+    
+    <section>
+      <title> Design </title>
+      <section>
+        <title> Design Overview </title>
+        <figure src="images/hdfsproxy-overview.jpg" alt="HDFS Proxy Architecture"/>
+        <p>As shown in the above figure, in the client-side, proxy server will accept requests from HSFTP client and HTTPS client. The requests will pass through a filter module (containing one or more filters) for access control checking. Then the requests will go through a delegation module, whose responsibility is to direct the requests to the right client version for accessing the source cluster. After that, the delegated client will talk to the source cluster server through RPC protocol using servlets. </p>
+      </section>
+  
+      <section>
+        <title> Filter Module: Proxy Authentication and Access Control </title>
+        <figure src="images/hdfsproxy-server.jpg" alt="HDFS Proxy Filters"/>
+        
+        <p> To realize proxy authentication and access control, we used a servlet filter. The filter module is very
+          flexible, it can be installed or disabled by simply changing the corresponding items in deployment
+          descriptor file (web.xml). We implemented two filters in the proxy code: ProxyFilter and LdapIpDirFilter. The process of how each filter works is listed as below.</p>
+               
+        <section>
+          <title>SSL certificate-based proxyFilter</title>
+          <ol>
+            <li>A user will use a pre-issued SSL certificate to access the proxy.</li>
+            <li>The proxy server will authenticate the user certificate.</li>
+            <li>The user’s authenticated identity (extracted from the user’s SSL certificate) is used to check access to data on the proxy.</li>
+            <li>User access information is stored in two configuration files, user-certs.xml and user-permissions.xml.</li>
+            <li>The proxy will forward the user’s authenticated identity to HDFS clusters for HDFS file permission checking</li>
+          </ol>
+        </section>
+        
+        <section>
+          <title>LDAP-based LdapIpDirFilter</title>
+          <ol>
+            <li>A standalone LDAP server need to be set-up to store user information as entries, and each entry contains userId, user group, IP address(es), allowable HDFS directories, etc.</li>
+            <li>An LDAP entry may contain multiple IP addresses with the same userId and group attribute to realize headless account.</li>
+            <li>Upon receiving a request, the proxy server will extract the user's Ip adress from the request header, query the LDAP server with the IP address to get the direcotry permission information, then compare that with the user request path to make a allow/deny decision.</li>
+          </ol>
+        </section>
+        <p>SSL-based proxyFilter provides strong PKI authentication and encryption, proxy server can create a self-signed CA using OpenSSL and use that CA to sign and issue certificates to clients. </p>
+        <p>Managing access information through configuration files is a convenient way to start and easy to set-up for a small user group. However, to scale to a large user group and to handle account management operations such as add, delete, and change access, a separate package or a different mechanism like LDAP server is needed.</p>
+        <p>The schema for the entry attributes in the LDAP server should match what is used in the proxy. The schema that is currently used in proxy is configurable through hdfsproxy-default.xml, but the attributes should always contain IP address (default as uniqueMember), userId (default as uid), user group (default as userClass), and alloable HDFS directories (default as documentLocation).</p>
+        <p>Users can also write their own filters to plug in the filter chain to realize extended functionalities.</p>
+      </section>
+      
+      <section>
+        <title> Delegation Module: HDFS Cross-version Data Transfer </title>
+        <figure src="images/hdfsproxy-forward.jpg" alt="HDFS Proxy Forwarding"/> 
+        <p>As shown in the Figure, the delegation module contains two parts: </p>
+        <ol>
+          <li>A Forwarding war, which plays the role of identifying the requests and directing the requests to the right HDFS client RPC version. </li>
+          <li>Several RPC client versions necessary to talk to all the HDFS source cluster servers. </li>
+        </ol>
+        <p>All servlets are packaged in the WAR files.</p>
+        <p>Strictly speaking, HDFS proxy does not by itself solve HDFS cross-version communication problem. However, through wrapping all the RPC client versions and delegating the client requests to the right version of RPC clients, HDFS proxy functions as if it can talk to multiple source clusters in different hadoop versions.</p>
+        <p>Packaging the servlets in the WAR files has several advantages:</p>
+        <ol>
+          <li>It reduces the complexity of writing our own ClassLoaders for different RPC clients. Servlet
+          container (Tomcat) already uses separate ClassLoaders for different WAR files.</li>
+          <li>Packaging is done by the Servlet container (Tomcat). For each client WAR file, its Servlets
+          only need to worry about its own version of source HDFS clusters.</li>
+        </ol>
+        <p>Note that the inter-communication between servlets in the forwarding war and that in the specific client version war can only be through built-in data types such as int, String, etc, as such data types are loaded first through common classloader. </p>
+      </section>
+      
+      <section>
+        <title> Servlets: Where Data transfer Occurs</title>
+        <p>Proxy server functionality is implemented using servlets deployed under servlet container. Specifically, there are 3 proxy servlets <code>ProxyListPathsServlet</code>, <code>ProxyFileDataServlet</code>, and <code>ProxyStreamFile</code>. Together, they implement the same H(S)FTP interface as the original <code>ListPathsServlet</code>, <code>FileDataServlet</code>, and <code>StreamFile</code> servlets do on an HDFS cluster. In fact, the proxy servlets are subclasses of the original servlets with minor changes like retrieving client UGI from the proxy server, etc. All these three servlets are put into the client war files.</p>
+        <p>The forwarding proxy, which was implemented through <code>ProxyForwardServlet</code>, is put in a separate web application (ROOT.war). All client requests should be sent to the forwarding proxy. The forwarding proxy does not implement any functionality by itself. Instead, it simply forwards client requests to the right web applications with the right servlet paths.</p>
+        <p>Forwarding servlets forward requests to servlets in the right web applications through servlet cross-context communication by setting <code>crossContext="true"</code> in servlet container's configuration file</p>
+        <p>Proxy server will install a servlet, <code>ProxyFileForward</code>, which is a subclass of <code>ProxyForwardServlet</code>, on path /file, which exposes a simple HTTPS GET interface (internally delegates the work to <code>ProxyStreamFile</code> servlet via forwarding mechanism discussed above). This interface supports standard HTTP clients like curl, wget, etc. HTTPS client requests on the wire should look like <code>https://proxy_address/file/file_path</code></p>
+      </section>
+      
+      <section>
+        <title> Load Balancing and Identifying Requests through Domain Names </title>
+        <figure src="images/request-identify.jpg" alt="Request Identification"/> 
+        <p>The delegation module relies on the forwarding WAR to be able to identify the requests so that it can direct the requests to the right HDFS client RPC versions. Identifying the requests through Domain Name, which can be extracted from the request header, is a straightforward way. Note that Domain Name can have many alias through CNAME. By exploiting such a feature, we can create a Domain Name, then create many alias of this domain name, and finally make these alias correspond to different client RPC request versions. As the same time, we may need many servers to do load balancing. We can make all these servers (with different IP addresses) point to the same Domain Name in a Round-robin fashion. By doing this, we can realize default load-balancing if we have multiple through proxy servers running in the back-end.</p>
+      </section>
+    
+    </section>
+    
+    <section>
+      <title> Jetty-based Installation and Configuration </title>
+      <p>With Jetty-based installation, only part of proxy features are supported.</p>
+      <section>
+        <title> Supporting Features </title>
+        <ul>
+          <li>Single Hadoop source cluster data transfer</li>
+          <li>Single Hadoop version data transfer</li>
+          <li>Authenticate users via user SSL certificates with <code>ProxyFilter</code> installed</li>
+          <li>Enforce access control based on configuration files.</li>
+        </ul>
+      </section>
+      
+      <section>
+        <title> Configuration Files </title>
+        <ol>
+          <li>
+            <strong>hdfsproxy-default.xml</strong>
+            <table>
+              <tr>
+                <th>Name</th>
+                <th>Description</th>
+              </tr>
+              <tr>
+                <td>hdfsproxy.https.address</td>
+                <td>the SSL port that hdfsproxy listens on. </td>
+              </tr>
+              <tr>
+                <td>hdfsproxy.hosts</td>
+                <td>location of hdfsproxy-hosts file. </td>
+              </tr>
+              <tr>
+                <td>hdfsproxy.dfs.namenode.address</td>
+                <td>namenode address of the HDFS cluster being proxied. </td>
+              </tr>
+              <tr>
+                <td>hdfsproxy.https.server.keystore.resource</td>
+                <td>location of the resource from which ssl server keystore information will be extracted. </td>
+              </tr>
+              <tr>
+                <td>hdfsproxy.user.permissions.file.location</td>
+                <td>location of the user permissions file. </td>
+              </tr>
+              <tr>
+                <td>hdfsproxy.user.certs.file.location</td>
+                <td>location of the user certs file. </td>
+              </tr>
+              <tr>
+                <td>hdfsproxy.ugi.cache.ugi.lifetime</td>
+                <td> The lifetime (in minutes) of a cached ugi. </td>
+              </tr>
+            </table>     
+          </li>              
+          <li>     
+            <strong>ssl-server.xml</strong>
+            <table>
+              <tr>
+                <th>Name</th>
+                <th>Description</th>
+              </tr>
+              <tr>
+                <td>ssl.server.truststore.location</td>
+                <td>location of the truststore. </td>
+              </tr>
+              <tr>
+                <td>ssl.server.truststore.password</td>
+                <td>truststore password. </td>
+              </tr>
+              <tr>
+                <td>ssl.server.keystore.location</td>
+                <td>location of the keystore. </td>
+              </tr>
+              <tr>
+                <td>ssl.server.keystore.password</td>
+                <td>keystore password. </td>
+              </tr>
+              <tr>
+                <td>ssl.server.keystore.keypassword</td>
+                <td>key password. </td>
+              </tr>
+            </table>
+          </li>
+          <li>     
+            <strong>user-certs.xml</strong>
+            <table>
+              <tr>
+                <th>Name</th>
+                <th>Description</th>
+              </tr>
+              <tr>
+                <td colspan="2">This file defines the mappings from username to comma seperated list of certificate serial numbers that the user is allowed to use. One mapping per user. Wildcard characters, such as "*" and "?", are not recognized. Any leading or trailing whitespaces are stripped/ignored. In order for a user to be able to do "clearUgiCache" and "reloadPermFiles" command, the certification serial number he use must also belong to the user "Admin". 
+                </td>
+              </tr>
+            </table>
+          </li>
+          <li>
+            <strong>user-permissions.xml</strong>
+            <table>
+              <tr>
+                <th>Name</th>
+                <th>Description</th>
+              </tr>
+              <tr>
+                <td colspan="2">This file defines the mappings from user name to comma seperated list of directories/files that the user is allowed to access. One mapping per user. Wildcard characters, such as "*" and "?", are not recognized. For example, to match "/output" directory, one can use "/output" or "/output/", but not "/output/*". Note that any leading or trailing whitespaces are stripped/ignored for the name field. 
+                </td>
+              </tr>
+            </table>
+          </li> 
+        </ol>
+      </section>
+      <section>
+        <title> Build Process </title>        
+        <p>Under <code>$HADOOP_HDFS_HOME</code> do the following <br/>
+          <code> $ ant clean tar</code> <br/>
+          <code> $ cd src/contrib/hdfsproxy/</code> <br/>
+          <code> $ ant clean tar</code> <br/>
+          The <code>hdfsproxy-*.tar.gz</code> file will be generated under <code>$HADOOP_HDFS_HOME/build/contrib/hdfsproxy/</code>. Use this tar ball to proceed for the server start-up/shutdown process after necessary configuration. 
+        </p>
+      </section>  
+      <section>
+        <title> Server Start up and Shutdown</title>        
+        <p> Starting up a Jetty-based HDFS Proxy server is similar to starting up an HDFS cluster. Simply run <code>hdfsproxy</code> shell command. The main configuration file is <code>hdfsproxy-default.xml</code>, which should be on the classpath. <code>hdfsproxy-env.sh</code> can be used to set up environmental variables. In particular, <code>JAVA_HOME</code> should be set. As listed above, additional configuration files include <code>user-certs.xml</code>, <code>user-permissions.xml</code> and <code>ssl-server.xml</code>, which are used to specify allowed user certs, allowed directories/files, and ssl keystore information for the proxy, respectively. The location of these files can be specified in <code>hdfsproxy-default.xml</code>. Environmental variable <code>HDFSPROXY_CONF_DIR</code> can be used to point to the directory where these configuration files are located. The configuration files (<code>hadoop-site.xml</code>, or <code>core-site.xml</code> and <code>hdfs-site.xml</code>) of the proxied HDFS cluster should also be available on the classpath .
+        </p>
+        <p> Mirroring those used in HDFS, a few shell scripts are provided to start and stop a group of proxy servers. The hosts to run hdfsproxy on are specified in <code>hdfsproxy-hosts</code> file, one host per line. All hdfsproxy servers are stateless and run independently from each other.  </p>
+        <p>
+          To start a group of proxy servers, do <br/>
+          <code> $ start-hdfsproxy.sh </code> 
+        </p>
+        <p>
+          To stop a group of proxy servers, do <br/>
+          <code> $ stop-hdfsproxy.sh </code> 
+        </p>
+        <p> 
+          To trigger reloading of <code>user-certs.xml</code> and <code>user-permissions.xml</code> files on all proxy servers listed in the <code>hdfsproxy-hosts</code> file, do <br/>       
+        <code> $ hdfsproxy -reloadPermFiles </code> 
+        </p>
+        <p>To clear the UGI caches on all proxy servers, do <br/>
+          <code> $ hdfsproxy -clearUgiCache </code> 
+        </p>
+      </section>     
+      
+      <section>
+        <title> Verification </title>
+        <p> Use HSFTP client <br/>
+          <code>bin/hadoop fs -ls "hsftp://proxy.address:port/"</code>
+        </p>
+      </section>
+
+    </section>      
+    
+    <section>
+        <title> Tomcat-based Installation and Configuration </title>
+        <p>With tomcat-based installation, all HDFS Proxy features are supported</p>
+        <section>
+          <title> Supporting Features </title>
+          <ul>
+            <li>Multiple Hadoop source cluster data transfer</li>
+            <li>Multiple Hadoop version data transfer</li>
+            <li>Authenticate users via user SSL certificates with <code>ProxyFilter</code> installed</li>
+            <li>Authentication and authorization via LDAP with <code>LdapIpDirFilter</code> installed</li>
+            <li>Access control based on configuration files if <code>ProxyFilter</code> is installed.</li>
+            <li>Access control based on LDAP entries if <code>LdapIpDirFilter</code> is installed.</li>
+            <li>Standard HTTPS Get Support for file transfer</li>
+          </ul>
+        </section>
+        
+        
+        <section>
+          <title> Source Cluster Related Configuration </title>
+          <ol>
+            <li>
+              <strong>hdfsproxy-default.xml</strong>
+              <table>
+                <tr>
+                  <th>Name</th>
+                  <th>Description</th>
+                </tr>
+                <tr>
+                  <td>fs.default.name</td>
+                  <td>Source Cluster NameNode address</td>
+                </tr>
+                <tr>
+                  <td>dfs.block.size</td>
+                  <td>The block size for file tranfers</td>
+                </tr>
+                <tr>
+                  <td>io.file.buffer.size</td>
+                  <td> The size of buffer for use in sequence files. The size of this buffer should probably be a multiple of hardware page size (4096 on Intel x86), and it determines how much data is buffered during read and write operations </td>
+                </tr>
+              </table>   
+            </li>
+          </ol>
+        </section>
+      
+        <section>
+          <title> SSL Related Configuration </title>
+          <ol>
+            <li>
+              <strong>hdfsproxy-default.xml</strong>
+              <table>
+                <tr>
+                  <th>Name</th>
+                  <th>Description</th>
+                </tr>
+                <tr>
+                  <td>hdfsproxy.user.permissions.file.location</td>
+                  <td>location of the user permissions file. </td>
+                </tr>
+                <tr>
+                  <td>hdfsproxy.user.certs.file.location</td>
+                  <td>location of the user certs file. </td>
+                </tr>
+                <tr>
+                  <td>hdfsproxy.ugi.cache.ugi.lifetime</td>
+                  <td> The lifetime (in minutes) of a cached ugi. </td>
+                </tr>
+              </table>     
+            </li>              
+            <li>     
+              <strong>user-certs.xml</strong>
+              <table>
+                <tr>
+                  <th>Name</th>
+                  <th>Description</th>
+                </tr>
+                <tr>
+                  <td colspan="2">This file defines the mappings from username to comma seperated list of certificate serial numbers that the user is allowed to use. One mapping per user. Wildcard characters, such as "*" and "?", are not recognized. Any leading or trailing whitespaces are stripped/ignored. In order for a user to be able to do "clearUgiCache" and "reloadPermFiles" command, the certification serial number he use must also belong to the user "Admin". 
+                  </td>
+                </tr>
+              </table>
+            </li>
+            <li>
+              <strong>user-permissions.xml</strong>
+              <table>
+                <tr>
+                  <th>Name</th>
+                  <th>Description</th>
+                </tr>
+                <tr>
+                  <td colspan="2">This file defines the mappings from user name to comma seperated list of directories/files that the user is allowed to access. One mapping per user. Wildcard characters, such as "*" and "?", are not recognized. For example, to match "/output" directory, one can use "/output" or "/output/", but not "/output/*". Note that any leading or trailing whitespaces are stripped/ignored for the name field. 
+                  </td>
+                </tr>
+              </table>
+            </li> 
+          </ol>
+        </section>
+        
+        <section>
+          <title> LDAP Related Configuration </title>
+          <ol>
+            <li>
+              <strong>hdfsproxy-default.xml</strong>
+              <table>
+                <tr>
+                  <th>Name</th>
+                  <th>Description</th>
+                </tr>
+                <tr>
+                  <td>hdfsproxy.ldap.initial.context.factory</td>
+                  <td>LDAP context factory. </td>
+                </tr>
+                <tr>
+                  <td>hdfsproxy.ldap.provider.url</td>
+                  <td>LDAP server address. </td>
+                </tr>
+                <tr>
+                  <td>hdfsproxy.ldap.role.base</td>
+                  <td>LDAP role base. </td>
+                </tr>
+              </table>     
+            </li>              
+          </ol>
+        </section>
+        
+        
+        <section>
+          <title> Tomcat Server Related Configuration </title>
+          <ol>
+            <li>
+              <strong>tomcat-forward-web.xml</strong>
+              <table>
+                <tr>
+                  <th>Name</th>
+                  <th>Description</th>
+                </tr>
+                <tr>
+                  <td colspan="2">This deployment descritor file defines how servlets and filters are installed in the forwarding war (ROOT.war). The default filter installed is <code>LdapIpDirFilter</code>, you can change to <code>ProxyFilter</code> with <code>org.apache.hadoop.hdfsproxy.ProxyFilter</code> as you <code>filter-class</code>. </td>
+                </tr>
+              </table>     
+            </li>
+            <li>
+              <strong>tomcat-web.xml</strong>
+              <table>                
+                <tr>
+                  <th>Name</th>
+                  <th>Description</th>
+                </tr>
+                <tr>
+                  <td colspan="2">This deployment descritor file defines how servlets and filters are installed in the client war. The default filter installed is <code>LdapIpDirFilter</code>, you can change to <code>ProxyFilter</code> with <code>org.apache.hadoop.hdfsproxy.ProxyFilter</code> as you <code>filter-class</code>. </td>
+                </tr>
+              </table>     
+            </li>
+            <li>
+              <strong>$TOMCAT_HOME/conf/server.xml</strong>
+              <table>                
+                <tr>
+                  <th>Name</th>
+                  <th>Description</th>
+                </tr>
+                <tr>
+                  <td colspan="2"> You need to change Tomcat's server.xml file under $TOMCAT_HOME/conf as detailed in <a href="http://tomcat.apache.org/tomcat-6.0-doc/ssl-howto.html">tomcat 6 ssl-howto</a>. Set <code>clientAuth="true"</code> if you need to authenticate client. 
+                  </td>
+                </tr>
+              </table>     
+            </li>
+            <li>
+              <strong>$TOMCAT_HOME/conf/context.xml</strong>
+              <table>                
+                <tr>
+                  <th>Name</th>
+                  <th>Description</th>
+                </tr>
+                <tr>
+                  <td colspan="2"> You need to change Tomcat's context.xml file under $TOMCAT_HOME/conf by adding <code>crossContext="true"</code> after <code>Context</code>.
+                  </td>
+                </tr>
+              </table>     
+            </li>
+          </ol>
+        </section>
+        <section>
+          <title> Build and Deployment Process </title>  
+          <section>
+            <title> Build forwarding war (ROOT.war) </title>
+            <p>Suppose hdfsproxy-default.xml has been properly configured and it is under ${user.home}/proxy-root-conf dir. Under <code>$HADOOP_HDFS_HOME</code> do the following <br/>
+              <code> $ export HDFSPROXY_CONF_DIR=${user.home}/proxy-root-conf</code> <br/>
+              <code> $ ant clean tar</code> <br/>
+              <code> $ cd src/contrib/hdfsproxy/</code> <br/>
+              <code> $ ant clean forward</code> <br/>
+              The <code>hdfsproxy-forward-*.war</code> file will be generated under <code>$HADOOP_HDFS_HOME/build/contrib/hdfsproxy/</code>. Copy this war file to tomcat's webapps directory and rename it at ROOT.war (if ROOT dir already exists, remove it first) for deployment. 
+            </p>
+          </section>
+          <section>
+            <title> Build cluster client war (client.war) </title>
+            <p>Suppose hdfsproxy-default.xml has been properly configured and it is under ${user.home}/proxy-client-conf dir. Under <code>$HADOOP_HDFS_HOME</code> do the following <br/>
+              <code> $ export HDFSPROXY_CONF_DIR=${user.home}/proxy-client-conf</code> <br/>
+              <code> $ ant clean tar</code> <br/>
+              <code> $ cd src/contrib/hdfsproxy/</code> <br/>
+              <code> $ ant clean war</code> <br/>
+              The <code>hdfsproxy-*.war</code> file will be generated under <code>$HADOOP_HDFS_HOME/build/contrib/hdfsproxy/</code>. Copy this war file to tomcat's webapps directory and rename it properly for deployment. 
+            </p>
+          </section>
+          <section>
+            <title> Handle Multiple Source Clusters </title>
+            <p> To proxy for multiple source clusters, you need to do the following:</p>
+            <ol>
+              <li>Build multiple client war with different names and different hdfsproxy-default.xml configurations</li>
+              <li>Make multiple alias using CNAME of the same Domain Name</li>
+              <li>Make sure the first part of the alias match the corresponding client war file name. For example, you have two source clusters, sc1 and sc2, and you made two alias of the same domain name, proxy1.apache.org and proxy2.apache.org, then you need to name the client war file as proxy1.war and proxy2.war respectively for your deployment.</li>
+            </ol>
+          </section>
+        </section>  
+        
+        <section>
+          <title> Server Start up and Shutdown</title>        
+          <p> Starting up and shutting down Tomcat-based HDFS Proxy server is no more than starting up and shutting down tomcat server with tomcat's bin/startup.sh and bin/shutdown.sh script.</p>
+          <p> If you need to authenticate client certs, you need either set <code>truststoreFile</code> and <code>truststorePass</code> following <a href="http://tomcat.apache.org/tomcat-6.0-doc/ssl-howto.html">tomcat 6 ssl-howto</a> in the configuration stage or give the truststore location by doing the following <br/>
+            <code>export JAVA_OPTS="-Djavax.net.ssl.trustStore=${user.home}/truststore-location -Djavax.net.ssl.trustStorePassword=trustpass"</code> <br/>
+            before you start-up tomcat.
+          </p>
+        </section>     
+        <section>
+          <title> Verification </title>
+          <p>HTTPS client <br/>
+            <code>curl -k "https://proxy.address:port/file/file-path"</code> <br/>
+            <code>wget --no-check-certificate "https://proxy.address:port/file/file-path"</code>
+          </p>
+          <p>HADOOP client <br/>
+            <code>bin/hadoop fs -ls "hsftp://proxy.address:port/"</code>
+          </p>
+        </section>
+        
+    </section>    
+    
+    <section>
+      <title> Hadoop Client Configuration </title>
+      <ul>
+        <li>
+          <strong>ssl-client.xml</strong>
+          <table>            
+            <tr>
+              <th>Name</th>
+              <th>Description</th>
+            </tr>
+            <tr>
+              <td>ssl.client.do.not.authenticate.server</td>
+              <td>if true, trust all server certificates, like curl's -k option</td>
+            </tr>
+            <tr>
+              <td>ssl.client.truststore.location</td>
+              <td>Location of truststore</td>
+            </tr>
+            <tr>
+              <td>ssl.client.truststore.password</td>
+              <td> truststore password </td>
+            </tr>
+            <tr>
+              <td>ssl.client.truststore.type</td>
+              <td> truststore type </td>
+            </tr>
+            <tr>
+              <td>ssl.client.keystore.location</td>
+              <td> Location of keystore </td>
+            </tr>
+            <tr>
+              <td>ssl.client.keystore.password</td>
+              <td> keystore password </td>
+            </tr>
+            <tr>
+              <td>ssl.client.keystore.type</td>
+              <td> keystore type </td>
+            </tr>
+            <tr>
+              <td>ssl.client.keystore.keypassword</td>
+              <td> keystore key password </td>
+            </tr>
+            <tr>
+              <td>ssl.expiration.warn.days</td>
+              <td> server certificate expiration war days threshold, 0 means no warning should be issued </td>
+            </tr>
+          </table>   
+        </li>
+      </ul>
+    </section>
+
+
+
+  </body>
+</document>

+ 1 - 0
src/docs/src/documentation/content/xdocs/site.xml

@@ -45,6 +45,7 @@ See http://forrest.apache.org/docs/linking.html for more info.
 		<native_lib    				label="Native Libraries" 					href="native_libraries.html" />
 		<streaming 				label="Streaming"          				href="streaming.html" />
 		<fair_scheduler 			label="Fair Scheduler" 					href="fair_scheduler.html"/>
+        <hdfsproxy 			label="HDFS Proxy" 					href="hdfsproxy.html"/>
 		<cap_scheduler 		label="Capacity Scheduler" 			href="capacity_scheduler.html"/>
 		<SLA					 	label="Service Level Authorization" 	href="service_level_auth.html"/>
 		<vaidya    					label="Vaidya" 								href="vaidya.html"/>

BIN
src/docs/src/documentation/resources/images/hdfsproxy-forward.jpg


BIN
src/docs/src/documentation/resources/images/hdfsproxy-overview.jpg


BIN
src/docs/src/documentation/resources/images/hdfsproxy-server.jpg


BIN
src/docs/src/documentation/resources/images/request-identify.jpg


+ 33 - 9
src/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -522,6 +522,23 @@ public class DFSClient implements FSConstants, java.io.Closeable {
         replication, blockSize, progress, buffersize);
   }
 
+  /**
+   * Call
+   * {@link #create(String,FsPermission,EnumSet,boolean,short,long,Progressable,int)}
+   * with createParent set to true.
+   */
+  public OutputStream create(String src, 
+      FsPermission permission,
+      EnumSet<CreateFlag> flag, 
+      short replication,
+      long blockSize,
+      Progressable progress,
+      int buffersize
+      ) throws IOException {
+    return create(src, permission, flag, true,
+        replication, blockSize, progress, buffersize);
+  }
+
   /**
    * Create a new dfs file with the specified block replication 
    * with write-progress reporting and return an output stream for writing
@@ -531,14 +548,16 @@ public class DFSClient implements FSConstants, java.io.Closeable {
    * @param permission The permission of the directory being created.
    * If permission == null, use {@link FsPermission#getDefault()}.
    * @param flag do not check for file existence if true
+   * @param createParent create missing parent directory if true
    * @param replication block replication
    * @return output stream
    * @throws IOException
-   * @see ClientProtocol#create(String, FsPermission, String, EnumSetWritable, short, long)
+   * @see ClientProtocol#create(String, FsPermission, String, EnumSetWritable, boolean, short, long)
    */
   public OutputStream create(String src, 
                              FsPermission permission,
                              EnumSet<CreateFlag> flag, 
+                             boolean createParent,
                              short replication,
                              long blockSize,
                              Progressable progress,
@@ -551,7 +570,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
     LOG.debug(src + ": masked=" + masked);
     OutputStream result = new DFSOutputStream(src, masked,
-        flag, replication, blockSize, progress, buffersize,
+        flag, createParent, replication, blockSize, progress, buffersize,
         conf.getInt("io.bytes.per.checksum", 512));
     leasechecker.put(src, result);
     return result;
@@ -951,8 +970,9 @@ public class DFSClient implements FSConstants, java.io.Closeable {
 
   /**
    */
+  @Deprecated
   public boolean mkdirs(String src) throws IOException {
-    return mkdirs(src, null);
+    return mkdirs(src, null, true);
   }
 
   /**
@@ -962,10 +982,11 @@ public class DFSClient implements FSConstants, java.io.Closeable {
    * @param src The path of the directory being created
    * @param permission The permission of the directory being created.
    * If permission == null, use {@link FsPermission#getDefault()}.
+   * @param createParent create missing parent directory if true
    * @return True if the operation success.
-   * @see ClientProtocol#mkdirs(String, FsPermission)
+   * @see ClientProtocol#mkdirs(String, FsPermission, boolean)
    */
-  public boolean mkdirs(String src, FsPermission permission)throws IOException{
+  public boolean mkdirs(String src, FsPermission permission, boolean createParent)throws IOException{
     checkOpen();
     if (permission == null) {
       permission = FsPermission.getDefault();
@@ -973,11 +994,12 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
     LOG.debug(src + ": masked=" + masked);
     try {
-      return namenode.mkdirs(src, masked);
+      return namenode.mkdirs(src, masked, createParent);
     } catch(RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
                                      NSQuotaExceededException.class,
                                      DSQuotaExceededException.class,
+                                     FileNotFoundException.class,
                                      FileAlreadyExistsException.class);
     }
   }
@@ -3120,10 +3142,10 @@ public class DFSClient implements FSConstants, java.io.Closeable {
 
     /**
      * Create a new output stream to the given DataNode.
-     * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
+     * @see ClientProtocol#create(String, FsPermission, String, EnumSetWritable, boolean, short, long)
      */
     DFSOutputStream(String src, FsPermission masked, EnumSet<CreateFlag> flag,
-        short replication, long blockSize, Progressable progress,
+        boolean createParent, short replication, long blockSize, Progressable progress,
         int buffersize, int bytesPerChecksum) throws IOException {
       this(src, blockSize, progress, bytesPerChecksum);
 
@@ -3131,9 +3153,11 @@ public class DFSClient implements FSConstants, java.io.Closeable {
 
       try {
         namenode.create(
-            src, masked, clientName, new EnumSetWritable<CreateFlag>(flag), replication, blockSize);
+            src, masked, clientName, new EnumSetWritable<CreateFlag>(flag), createParent, replication, blockSize);
       } catch(RemoteException re) {
         throw re.unwrapRemoteException(AccessControlException.class,
+                                       FileAlreadyExistsException.class,
+                                       FileNotFoundException.class,
                                        NSQuotaExceededException.class,
                                        DSQuotaExceededException.class);
       }

+ 21 - 1
src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -209,6 +209,18 @@ public class DistributedFileSystem extends FileSystem {
         statistics);
   }
 
+  /**
+   * Same as create(), except fails if parent directory doesn't already exist.
+   * @see #create(Path, FsPermission, EnumSet, int, short, long, Progressable)
+   */
+  public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
+      EnumSet<CreateFlag> flag, int bufferSize, short replication,
+      long blockSize, Progressable progress) throws IOException {
+
+    return new FSDataOutputStream(dfs.create(getPathName(f), permission, flag,
+        false, replication, blockSize, progress, bufferSize), statistics);
+  }
+
   @Override
   public boolean setReplication(Path src, 
                                 short replication
@@ -268,9 +280,17 @@ public class DistributedFileSystem extends FileSystem {
     return stats;
   }
 
+  /**
+   * Create a directory with given name and permission, only when
+   * parent directory exists.
+   */
+  public boolean mkdir(Path f, FsPermission permission) throws IOException {
+    return dfs.mkdirs(getPathName(f), permission, false);
+  }
+
   @Override
   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
-    return dfs.mkdirs(getPathName(f), permission);
+    return dfs.mkdirs(getPathName(f), permission, true);
   }
 
   /** {@inheritDoc} */

+ 7 - 3
src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -44,9 +44,9 @@ public interface ClientProtocol extends VersionedProtocol {
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 47: added a new method getServerDefaults(), see HDFS-578
+   * 48: modified mkdirs() to take an additional boolean parameter
    */
-  public static final long versionID = 47L;
+  public static final long versionID = 48L;
   
   ///////////////////////////////////////
   // File contents
@@ -101,6 +101,7 @@ public interface ClientProtocol extends VersionedProtocol {
    * @param clientName name of the current client.
    * @param flag indicates whether the file should be 
    * overwritten if it already exists or create if it does not exist or append.
+   * @param createParent create missing parent directory if true
    * @param replication block replication factor.
    * @param blockSize maximum block size.
    * 
@@ -115,6 +116,7 @@ public interface ClientProtocol extends VersionedProtocol {
                      FsPermission masked,
                              String clientName, 
                              EnumSetWritable<CreateFlag> flag, 
+                             boolean createParent,
                              short replication,
                              long blockSize
                              ) throws IOException;
@@ -268,6 +270,7 @@ public interface ClientProtocol extends VersionedProtocol {
    *
    * @param src The path of the directory being created
    * @param masked The masked permission of the directory being created
+   * @param createParent create missing parent directory if true
    * @return True if the operation success.
    * @throws {@link AccessControlException} if permission to create file is 
    * denied by the system. As usually on the client side the exception will 
@@ -275,7 +278,8 @@ public interface ClientProtocol extends VersionedProtocol {
    * @throws QuotaExceededException if the operation would violate 
    *                                any quota restriction.
    */
-  public boolean mkdirs(String src, FsPermission masked) throws IOException;
+  public boolean mkdirs(String src, FsPermission masked, boolean createParent)
+      throws IOException;
 
   /**
    * Get a listing of the indicated directory

+ 20 - 3
src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java

@@ -25,6 +25,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.lang.Class;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -64,6 +65,9 @@ import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyDefault;
+import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.io.IOUtils;
@@ -772,18 +776,31 @@ public class Balancer implements Tool {
       }
     }
   }
+
+  /* Check that this Balancer is compatible with the Block Placement Policy
+   * used by the Namenode.
+   */
+  private void checkReplicationPolicyCompatibility(Configuration conf) throws UnsupportedActionException {
+    if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() != 
+        BlockPlacementPolicyDefault.class) {
+      throw new UnsupportedActionException("Balancer without BlockPlacementPolicyDefault");
+    }
+  }
   
   /** Default constructor */
-  Balancer() {
+  Balancer() throws UnsupportedActionException {
+    checkReplicationPolicyCompatibility(getConf());
   }
   
   /** Construct a balancer from the given configuration */
-  Balancer(Configuration conf) {
+  Balancer(Configuration conf) throws UnsupportedActionException {
+    checkReplicationPolicyCompatibility(conf);
     setConf(conf);
   } 
 
   /** Construct a balancer from the given configuration and threshold */
-  Balancer(Configuration conf, double threshold) {
+  Balancer(Configuration conf, double threshold) throws UnsupportedActionException {
+    checkReplicationPolicyCompatibility(conf);
     setConf(conf);
     this.threshold = threshold;
   }

+ 5 - 6
src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java

@@ -56,8 +56,6 @@ import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.io.IOUtils;
 
-import org.mortbay.log.Log;
-
 /**************************************************
  * FSDataset manages a set of data blocks.  Each block
  * has a unique name and an extent on disk.
@@ -689,10 +687,11 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
           }
         }
         volumes = fsvs; // replace array of volumes
+        DataNode.LOG.info("Completed FSVolumeSet.checkDirs. Removed "
+            + removed_vols.size() + " volumes. List of current volumes: "
+            + this);
       }
-      Log.info("Completed FSVolumeSet.checkDirs. Removed=" + removed_size + 
-          "volumes. List of current volumes: " +   toString());
-      
+
       return removed_vols;
     }
       
@@ -1608,7 +1607,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
       }
     } // end of sync
     mlsec = System.currentTimeMillis() - mlsec;
-    DataNode.LOG.warn(">>>>>>>>>>>>Removed " + removed_blocks + " out of " + total_blocks +
+    DataNode.LOG.warn("Removed " + removed_blocks + " out of " + total_blocks +
         "(took " + mlsec + " millisecs)");
 
     // report the error

+ 12 - 9
src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java

@@ -120,7 +120,7 @@ public class BlockManager {
   Random r = new Random();
 
   // for block replicas placement
-  ReplicationTargetChooser replicator;
+  BlockPlacementPolicy replicator;
 
   BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
     this(fsn, conf, DEFAULT_INITIAL_MAP_CAPACITY);
@@ -137,8 +137,8 @@ public class BlockManager {
   }
 
   void setConfigurationParameters(Configuration conf) throws IOException {
-    this.replicator = new ReplicationTargetChooser(
-                         conf.getBoolean("dfs.replication.considerLoad", true),
+    this.replicator = BlockPlacementPolicy.getInstance(
+                         conf,
                          namesystem,
                          namesystem.clusterMap);
 
@@ -236,7 +236,7 @@ public class BlockManager {
    * @return true if the block has minimum replicas
    */
   boolean checkMinReplication(Block block) {
-    return (blocksMap.numNodes(block) >= minReplication);
+    return (countNodes(block).liveReplicas() >= minReplication);
   }
 
   /**
@@ -716,12 +716,13 @@ public class BlockManager {
     int requiredReplication, numEffectiveReplicas;
     List<DatanodeDescriptor> containingNodes;
     DatanodeDescriptor srcNode;
+    INodeFile fileINode = null;
     int additionalReplRequired;
 
     synchronized (namesystem) {
       synchronized (neededReplications) {
         // block should belong to a file
-        INodeFile fileINode = blocksMap.getINode(block);
+        fileINode = blocksMap.getINode(block);
         // abandoned block or block reopened for append
         if(fileINode == null || fileINode.isUnderConstruction()) {
           neededReplications.remove(block, priority); // remove from neededReplications
@@ -768,9 +769,11 @@ public class BlockManager {
     }
 
     // choose replication targets: NOT HOLDING THE GLOBAL LOCK
+    // It is costly to extract the filename for which chooseTargets is called,
+    // so for now we pass in the Inode itself.
     DatanodeDescriptor targets[] = 
-                       replicator.chooseTarget(additionalReplRequired,
-                       srcNode, containingNodes, null, block.getNumBytes());
+                       replicator.chooseTarget(fileINode, additionalReplRequired,
+                       srcNode, containingNodes, block.getNumBytes());
     if(targets.length == 0)
       return false;
 
@@ -778,7 +781,7 @@ public class BlockManager {
       synchronized (neededReplications) {
         // Recheck since global lock was released
         // block should belong to a file
-        INodeFile fileINode = blocksMap.getINode(block);
+        fileINode = blocksMap.getINode(block);
         // abandoned block or block reopened for append
         if(fileINode == null || fileINode.isUnderConstruction()) {
           neededReplications.remove(block, priority); // remove from neededReplications
@@ -1245,7 +1248,7 @@ public class BlockManager {
       }
     }
     namesystem.chooseExcessReplicates(nonExcess, block, replication, 
-        addedNode, delNodeHint);
+        addedNode, delNodeHint, replicator);
   }
 
   void addToExcessReplicate(DatanodeInfo dn, Block block) {

+ 172 - 0
src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java

@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node; 
+import org.apache.hadoop.util.ReflectionUtils;
+import java.util.*;
+
+/** 
+ * This interface is used for choosing the desired number of targets
+ * for placing block replicas.
+ */
+public abstract class BlockPlacementPolicy {
+    
+  public static class NotEnoughReplicasException extends Exception {
+    private static final long serialVersionUID = 1L;
+    NotEnoughReplicasException(String msg) {
+      super(msg);
+    }
+  }
+    
+  /**
+   * choose <i>numOfReplicas</i> data nodes for <i>writer</i> 
+   * to re-replicate a block with size <i>blocksize</i> 
+   * If not, return as many as we can.
+   * 
+   * @param srcPath the file to which this chooseTargets is being invoked. 
+   * @param numOfReplicas additional number of replicas wanted.
+   * @param writer the writer's machine, null if not in the cluster.
+   * @param chosenNodes datanodes that have been chosen as targets.
+   * @param blocksize size of the data to be written.
+   * @return array of DatanodeDescriptor instances chosen as target 
+   * and sorted as a pipeline.
+   */
+  abstract DatanodeDescriptor[] chooseTarget(String srcPath,
+                                             int numOfReplicas,
+                                             DatanodeDescriptor writer,
+                                             List<DatanodeDescriptor> chosenNodes,
+                                             long blocksize);
+
+  /**
+   * choose <i>numOfReplicas</i> data nodes for <i>writer</i> 
+   * to re-replicate a block with size <i>blocksize</i> 
+   * If not, return as many as we can.
+   * The base implemenatation extracts the pathname of the file from the
+   * specified srcInode, but this could be a costly operation depending on the
+   * file system implementation. Concrete implementations of this class should
+   * override this method to avoid this overhead.
+   * 
+   * @param srcInode The inode of the file for which chooseTarget is being invoked.
+   * @param numOfReplicas additional number of replicas wanted.
+   * @param writer the writer's machine, null if not in the cluster.
+   * @param chosenNodes datanodes that have been chosen as targets.
+   * @param blocksize size of the data to be written.
+   * @return array of DatanodeDescriptor instances chosen as target 
+   * and sorted as a pipeline.
+   */
+  DatanodeDescriptor[] chooseTarget(FSInodeInfo srcInode,
+                                    int numOfReplicas,
+                                    DatanodeDescriptor writer,
+                                    List<DatanodeDescriptor> chosenNodes,
+                                    long blocksize) {
+    return chooseTarget(srcInode.getFullPathName(), numOfReplicas, writer,
+                        chosenNodes, blocksize);
+  }
+
+  /**
+   * Verify that the block is replicated on at least minRacks different racks
+   * if there is more than minRacks rack in the system.
+   * 
+   * @param srcPath the full pathname of the file to be verified
+   * @param lBlk block with locations
+   * @param minRacks number of racks the block should be replicated to
+   * @return the difference between the required and the actual number of racks
+   * the block is replicated to.
+   */
+  abstract public int verifyBlockPlacement(String srcPath,
+                                           LocatedBlock lBlk,
+                                           int minRacks);
+  /**
+   * Decide whether deleting the specified replica of the block still makes 
+   * the block conform to the configured block placement policy.
+   * 
+   * @param srcInode The inode of the file to which the block-to-be-deleted belongs
+   * @param block The block to be deleted
+   * @param replicationFactor The required number of replicas for this block
+   * @param existingReplicas The replica locations of this block that are present
+                  on at least two unique racks. 
+   * @param moreExistingReplicas Replica locations of this block that are not
+                   listed in the previous parameter.
+   * @return the replica that is the best candidate for deletion
+   */
+  abstract public DatanodeDescriptor chooseReplicaToDelete(FSInodeInfo srcInode,
+                                      Block block, 
+                                      short replicationFactor,
+                                      Collection<DatanodeDescriptor> existingReplicas,
+                                      Collection<DatanodeDescriptor> moreExistingReplicas);
+
+  /**
+   * Used to setup a BlockPlacementPolicy object. This should be defined by 
+   * all implementations of a BlockPlacementPolicy.
+   * 
+   * @param conf the configuration object
+   * @param stats retrieve cluster status from here
+   * @param clusterMap cluster topology
+   */
+  abstract protected void initialize(Configuration conf,  FSClusterStats stats, 
+                                     NetworkTopology clusterMap);
+    
+  /**
+   * Get an instance of the configured Block Placement Policy based on the
+   * value of the configuration paramater dfs.block.replicator.classname.
+   * 
+   * @param conf the configuration to be used
+   * @param stats an object thatis used to retrieve the load on the cluster
+   * @param clusterMap the network topology of the cluster
+   * @return an instance of BlockPlacementPolicy
+   */
+  public static BlockPlacementPolicy getInstance(Configuration conf, 
+                                                 FSClusterStats stats,
+                                                 NetworkTopology clusterMap) {
+    Class<? extends BlockPlacementPolicy> replicatorClass =
+                      conf.getClass("dfs.block.replicator.classname",
+                                    BlockPlacementPolicyDefault.class,
+                                    BlockPlacementPolicy.class);
+    BlockPlacementPolicy replicator = (BlockPlacementPolicy) ReflectionUtils.newInstance(
+                                                             replicatorClass, conf);
+    replicator.initialize(conf, stats, clusterMap);
+    return replicator;
+  }
+
+  /**
+   * choose <i>numOfReplicas</i> nodes for <i>writer</i> to replicate
+   * a block with size <i>blocksize</i> 
+   * If not, return as many as we can.
+   * 
+   * @param srcPath a string representation of the file for which chooseTarget is invoked
+   * @param numOfReplicas number of replicas wanted.
+   * @param writer the writer's machine, null if not in the cluster.
+   * @param blocksize size of the data to be written.
+   * @return array of DatanodeDescriptor instances chosen as targets
+   * and sorted as a pipeline.
+   */
+  DatanodeDescriptor[] chooseTarget(String srcPath,
+                                    int numOfReplicas,
+                                    DatanodeDescriptor writer,
+                                    long blocksize) {
+    return chooseTarget(srcPath, numOfReplicas, writer,
+                        new ArrayList<DatanodeDescriptor>(),
+                        blocksize);
+  }
+}

+ 504 - 0
src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java

@@ -0,0 +1,504 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+import java.util.*;
+
+/** The class is responsible for choosing the desired number of targets
+ * for placing block replicas.
+ * The replica placement strategy is that if the writer is on a datanode,
+ * the 1st replica is placed on the local machine, 
+ * otherwise a random datanode. The 2nd replica is placed on a datanode
+ * that is on a different rack. The 3rd replica is placed on a datanode
+ * which is on a different node of the rack as the second replica.
+ */
+public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
+  private boolean considerLoad; 
+  private NetworkTopology clusterMap;
+  private FSClusterStats stats;
+
+  BlockPlacementPolicyDefault(Configuration conf,  FSClusterStats stats,
+                           NetworkTopology clusterMap) {
+    initialize(conf, stats, clusterMap);
+  }
+
+  BlockPlacementPolicyDefault() {
+  }
+    
+  /** {@inheritDoc} */
+  public void initialize(Configuration conf,  FSClusterStats stats,
+                         NetworkTopology clusterMap) {
+    this.considerLoad = conf.getBoolean("dfs.replication.considerLoad", true);
+    this.stats = stats;
+    this.clusterMap = clusterMap;
+  }
+
+  /** {@inheritDoc} */
+  public DatanodeDescriptor[] chooseTarget(String srcPath,
+                                    int numOfReplicas,
+                                    DatanodeDescriptor writer,
+                                    List<DatanodeDescriptor> chosenNodes,
+                                    long blocksize) {
+    return chooseTarget(numOfReplicas, writer, chosenNodes, null, blocksize);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public DatanodeDescriptor[] chooseTarget(FSInodeInfo srcInode,
+                                    int numOfReplicas,
+                                    DatanodeDescriptor writer,
+                                    List<DatanodeDescriptor> chosenNodes,
+                                    long blocksize) {
+    return chooseTarget(numOfReplicas, writer, chosenNodes, null, blocksize);
+  }
+    
+  /**
+   * This is not part of the public API but is used by the unit tests.
+   */
+  DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+                                    DatanodeDescriptor writer,
+                                    List<DatanodeDescriptor> chosenNodes,
+                                    HashMap<Node, Node> excludedNodes,
+                                    long blocksize) {
+    if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
+      return new DatanodeDescriptor[0];
+    }
+      
+    if (excludedNodes == null) {
+      excludedNodes = new HashMap<Node, Node>();
+    }
+     
+    int clusterSize = clusterMap.getNumOfLeaves();
+    int totalNumOfReplicas = chosenNodes.size()+numOfReplicas;
+    if (totalNumOfReplicas > clusterSize) {
+      numOfReplicas -= (totalNumOfReplicas-clusterSize);
+      totalNumOfReplicas = clusterSize;
+    }
+      
+    int maxNodesPerRack = 
+      (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
+      
+    List<DatanodeDescriptor> results = 
+      new ArrayList<DatanodeDescriptor>(chosenNodes);
+    for (Node node:chosenNodes) {
+      excludedNodes.put(node, node);
+    }
+      
+    if (!clusterMap.contains(writer)) {
+      writer=null;
+    }
+      
+    DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, 
+                                                excludedNodes, blocksize, maxNodesPerRack, results);
+      
+    results.removeAll(chosenNodes);
+      
+    // sorting nodes to form a pipeline
+    return getPipeline((writer==null)?localNode:writer,
+                       results.toArray(new DatanodeDescriptor[results.size()]));
+  }
+    
+  /* choose <i>numOfReplicas</i> from all data nodes */
+  private DatanodeDescriptor chooseTarget(int numOfReplicas,
+                                          DatanodeDescriptor writer,
+                                          HashMap<Node, Node> excludedNodes,
+                                          long blocksize,
+                                          int maxNodesPerRack,
+                                          List<DatanodeDescriptor> results) {
+      
+    if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
+      return writer;
+    }
+      
+    int numOfResults = results.size();
+    boolean newBlock = (numOfResults==0);
+    if (writer == null && !newBlock) {
+      writer = results.get(0);
+    }
+      
+    try {
+      if (numOfResults == 0) {
+        writer = chooseLocalNode(writer, excludedNodes, 
+                                 blocksize, maxNodesPerRack, results);
+        if (--numOfReplicas == 0) {
+          return writer;
+        }
+      }
+      if (numOfResults <= 1) {
+        chooseRemoteRack(1, results.get(0), excludedNodes, 
+                         blocksize, maxNodesPerRack, results);
+        if (--numOfReplicas == 0) {
+          return writer;
+        }
+      }
+      if (numOfResults <= 2) {
+        if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
+          chooseRemoteRack(1, results.get(0), excludedNodes,
+                           blocksize, maxNodesPerRack, results);
+        } else if (newBlock){
+          chooseLocalRack(results.get(1), excludedNodes, blocksize, 
+                          maxNodesPerRack, results);
+        } else {
+          chooseLocalRack(writer, excludedNodes, blocksize,
+                          maxNodesPerRack, results);
+        }
+        if (--numOfReplicas == 0) {
+          return writer;
+        }
+      }
+      chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, 
+                   blocksize, maxNodesPerRack, results);
+    } catch (NotEnoughReplicasException e) {
+      FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of "
+               + numOfReplicas);
+    }
+    return writer;
+  }
+    
+  /* choose <i>localMachine</i> as the target.
+   * if <i>localMachine</i> is not available, 
+   * choose a node on the same rack
+   * @return the chosen node
+   */
+  private DatanodeDescriptor chooseLocalNode(
+                                             DatanodeDescriptor localMachine,
+                                             HashMap<Node, Node> excludedNodes,
+                                             long blocksize,
+                                             int maxNodesPerRack,
+                                             List<DatanodeDescriptor> results)
+    throws NotEnoughReplicasException {
+    // if no local machine, randomly choose one node
+    if (localMachine == null)
+      return chooseRandom(NodeBase.ROOT, excludedNodes, 
+                          blocksize, maxNodesPerRack, results);
+      
+    // otherwise try local machine first
+    Node oldNode = excludedNodes.put(localMachine, localMachine);
+    if (oldNode == null) { // was not in the excluded list
+      if (isGoodTarget(localMachine, blocksize,
+                       maxNodesPerRack, false, results)) {
+        results.add(localMachine);
+        return localMachine;
+      }
+    } 
+      
+    // try a node on local rack
+    return chooseLocalRack(localMachine, excludedNodes, 
+                           blocksize, maxNodesPerRack, results);
+  }
+    
+  /* choose one node from the rack that <i>localMachine</i> is on.
+   * if no such node is available, choose one node from the rack where
+   * a second replica is on.
+   * if still no such node is available, choose a random node 
+   * in the cluster.
+   * @return the chosen node
+   */
+  private DatanodeDescriptor chooseLocalRack(
+                                             DatanodeDescriptor localMachine,
+                                             HashMap<Node, Node> excludedNodes,
+                                             long blocksize,
+                                             int maxNodesPerRack,
+                                             List<DatanodeDescriptor> results)
+    throws NotEnoughReplicasException {
+    // no local machine, so choose a random machine
+    if (localMachine == null) {
+      return chooseRandom(NodeBase.ROOT, excludedNodes, 
+                          blocksize, maxNodesPerRack, results);
+    }
+      
+    // choose one from the local rack
+    try {
+      return chooseRandom(
+                          localMachine.getNetworkLocation(),
+                          excludedNodes, blocksize, maxNodesPerRack, results);
+    } catch (NotEnoughReplicasException e1) {
+      // find the second replica
+      DatanodeDescriptor newLocal=null;
+      for(Iterator<DatanodeDescriptor> iter=results.iterator();
+          iter.hasNext();) {
+        DatanodeDescriptor nextNode = iter.next();
+        if (nextNode != localMachine) {
+          newLocal = nextNode;
+          break;
+        }
+      }
+      if (newLocal != null) {
+        try {
+          return chooseRandom(
+                              newLocal.getNetworkLocation(),
+                              excludedNodes, blocksize, maxNodesPerRack, results);
+        } catch(NotEnoughReplicasException e2) {
+          //otherwise randomly choose one from the network
+          return chooseRandom(NodeBase.ROOT, excludedNodes,
+                              blocksize, maxNodesPerRack, results);
+        }
+      } else {
+        //otherwise randomly choose one from the network
+        return chooseRandom(NodeBase.ROOT, excludedNodes,
+                            blocksize, maxNodesPerRack, results);
+      }
+    }
+  }
+    
+  /* choose <i>numOfReplicas</i> nodes from the racks 
+   * that <i>localMachine</i> is NOT on.
+   * if not enough nodes are available, choose the remaining ones 
+   * from the local rack
+   */
+    
+  private void chooseRemoteRack(int numOfReplicas,
+                                DatanodeDescriptor localMachine,
+                                HashMap<Node, Node> excludedNodes,
+                                long blocksize,
+                                int maxReplicasPerRack,
+                                List<DatanodeDescriptor> results)
+    throws NotEnoughReplicasException {
+    int oldNumOfReplicas = results.size();
+    // randomly choose one node from remote racks
+    try {
+      chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
+                   excludedNodes, blocksize, maxReplicasPerRack, results);
+    } catch (NotEnoughReplicasException e) {
+      chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
+                   localMachine.getNetworkLocation(), excludedNodes, blocksize, 
+                   maxReplicasPerRack, results);
+    }
+  }
+
+  /* Randomly choose one target from <i>nodes</i>.
+   * @return the chosen node
+   */
+  private DatanodeDescriptor chooseRandom(
+                                          String nodes,
+                                          HashMap<Node, Node> excludedNodes,
+                                          long blocksize,
+                                          int maxNodesPerRack,
+                                          List<DatanodeDescriptor> results) 
+    throws NotEnoughReplicasException {
+    int numOfAvailableNodes =
+      clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
+    while(numOfAvailableNodes > 0) {
+      DatanodeDescriptor chosenNode = 
+        (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
+
+      Node oldNode = excludedNodes.put(chosenNode, chosenNode);
+      if (oldNode == null) { // choosendNode was not in the excluded list
+        numOfAvailableNodes--;
+        if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+          results.add(chosenNode);
+          return chosenNode;
+        }
+      }
+    }
+
+    throw new NotEnoughReplicasException(
+        "Not able to place enough replicas");
+  }
+    
+  /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
+   */
+  private void chooseRandom(int numOfReplicas,
+                            String nodes,
+                            HashMap<Node, Node> excludedNodes,
+                            long blocksize,
+                            int maxNodesPerRack,
+                            List<DatanodeDescriptor> results)
+    throws NotEnoughReplicasException {
+      
+    int numOfAvailableNodes =
+      clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
+    while(numOfReplicas > 0 && numOfAvailableNodes > 0) {
+      DatanodeDescriptor chosenNode = 
+        (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
+      Node oldNode = excludedNodes.put(chosenNode, chosenNode);
+      if (oldNode == null) {
+        numOfAvailableNodes--;
+
+        if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+          numOfReplicas--;
+          results.add(chosenNode);
+        }
+      }
+    }
+      
+    if (numOfReplicas>0) {
+      throw new NotEnoughReplicasException(
+                                           "Not able to place enough replicas");
+    }
+  }
+    
+  /* judge if a node is a good target.
+   * return true if <i>node</i> has enough space, 
+   * does not have too much load, and the rack does not have too many nodes
+   */
+  private boolean isGoodTarget(DatanodeDescriptor node,
+                               long blockSize, int maxTargetPerLoc,
+                               List<DatanodeDescriptor> results) {
+    return isGoodTarget(node, blockSize, maxTargetPerLoc,
+                        this.considerLoad, results);
+  }
+    
+  private boolean isGoodTarget(DatanodeDescriptor node,
+                               long blockSize, int maxTargetPerLoc,
+                               boolean considerLoad,
+                               List<DatanodeDescriptor> results) {
+    Log logr = FSNamesystem.LOG;
+    // check if the node is (being) decommissed
+    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+      logr.debug("Node "+NodeBase.getPath(node)+
+                " is not chosen because the node is (being) decommissioned");
+      return false;
+    }
+
+    long remaining = node.getRemaining() - 
+                     (node.getBlocksScheduled() * blockSize); 
+    // check the remaining capacity of the target machine
+    if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
+      logr.debug("Node "+NodeBase.getPath(node)+
+                " is not chosen because the node does not have enough space");
+      return false;
+    }
+      
+    // check the communication traffic of the target machine
+    if (considerLoad) {
+      double avgLoad = 0;
+      int size = clusterMap.getNumOfLeaves();
+      if (size != 0 && stats != null) {
+        avgLoad = (double)stats.getTotalLoad()/size;
+      }
+      if (node.getXceiverCount() > (2.0 * avgLoad)) {
+        logr.debug("Node "+NodeBase.getPath(node)+
+                  " is not chosen because the node is too busy");
+        return false;
+      }
+    }
+      
+    // check if the target rack has chosen too many nodes
+    String rackname = node.getNetworkLocation();
+    int counter=1;
+    for(Iterator<DatanodeDescriptor> iter = results.iterator();
+        iter.hasNext();) {
+      Node result = iter.next();
+      if (rackname.equals(result.getNetworkLocation())) {
+        counter++;
+      }
+    }
+    if (counter>maxTargetPerLoc) {
+      logr.debug("Node "+NodeBase.getPath(node)+
+                " is not chosen because the rack has too many chosen nodes");
+      return false;
+    }
+    return true;
+  }
+    
+  /* Return a pipeline of nodes.
+   * The pipeline is formed finding a shortest path that 
+   * starts from the writer and traverses all <i>nodes</i>
+   * This is basically a traveling salesman problem.
+   */
+  private DatanodeDescriptor[] getPipeline(
+                                           DatanodeDescriptor writer,
+                                           DatanodeDescriptor[] nodes) {
+    if (nodes.length==0) return nodes;
+      
+    synchronized(clusterMap) {
+      int index=0;
+      if (writer == null || !clusterMap.contains(writer)) {
+        writer = nodes[0];
+      }
+      for(;index<nodes.length; index++) {
+        DatanodeDescriptor shortestNode = nodes[index];
+        int shortestDistance = clusterMap.getDistance(writer, shortestNode);
+        int shortestIndex = index;
+        for(int i=index+1; i<nodes.length; i++) {
+          DatanodeDescriptor currentNode = nodes[i];
+          int currentDistance = clusterMap.getDistance(writer, currentNode);
+          if (shortestDistance>currentDistance) {
+            shortestDistance = currentDistance;
+            shortestNode = currentNode;
+            shortestIndex = i;
+          }
+        }
+        //switch position index & shortestIndex
+        if (index != shortestIndex) {
+          nodes[shortestIndex] = nodes[index];
+          nodes[index] = shortestNode;
+        }
+        writer = shortestNode;
+      }
+    }
+    return nodes;
+  }
+
+  /** {@inheritDoc} */
+  public int verifyBlockPlacement(String srcPath,
+                                  LocatedBlock lBlk,
+                                  int minRacks) {
+    DatanodeInfo[] locs = lBlk.getLocations();
+    if (locs == null)
+      locs = new DatanodeInfo[0];
+    int numRacks = clusterMap.getNumOfRacks();
+    if(numRacks <= 1) // only one rack
+      return 0;
+    minRacks = Math.min(minRacks, numRacks);
+    // 1. Check that all locations are different.
+    // 2. Count locations on different racks.
+    Set<String> racks = new TreeSet<String>();
+    for (DatanodeInfo dn : locs)
+      racks.add(dn.getNetworkLocation());
+    return minRacks - racks.size();
+  }
+
+  /** {@inheritDoc} */
+  public DatanodeDescriptor chooseReplicaToDelete(FSInodeInfo inode,
+                                                 Block block,
+                                                 short replicationFactor,
+                                                 Collection<DatanodeDescriptor> first, 
+                                                 Collection<DatanodeDescriptor> second) {
+    long minSpace = Long.MAX_VALUE;
+    DatanodeDescriptor cur = null;
+
+    // pick replica from the first Set. If first is empty, then pick replicas
+    // from second set.
+    Iterator<DatanodeDescriptor> iter =
+          first.isEmpty() ? second.iterator() : first.iterator();
+
+    // pick node with least free space
+    while (iter.hasNext() ) {
+      DatanodeDescriptor node = iter.next();
+      long free = node.getRemaining();
+      if (minSpace > free) {
+        minSpace = free;
+        cur = node;
+      }
+    }
+    return cur;
+  }
+}
+

+ 36 - 0
src/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java

@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+/** 
+ * This interface is used for retrieving the load related statistics of 
+ * the cluster.
+ */
+public interface FSClusterStats {
+
+  /**
+   * an indication of the total load of the cluster.
+   * 
+   * @return a count of the total number of block transfers and block
+   *         writes that are currently occuring on the cluster.
+   */
+
+  public int getTotalLoad() ;
+}
+    
+    

+ 17 - 0
src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -946,6 +946,23 @@ class FSDirectory implements Closeable {
     }
     return fullPathName.toString();
   }
+
+  /** Return the full path name of the specified inode */
+  static String getFullPathName(INode inode) {
+    // calculate the depth of this inode from root
+    int depth = 0;
+    for (INode i = inode; i != null; i = i.parent) {
+      depth++;
+    }
+    INode[] inodes = new INode[depth];
+
+    // fill up the inodes in the path from this inode to root
+    for (int i = 0; i < depth; i++) {
+      inodes[depth-i-1] = inode;
+      inode = inode.parent;
+    }
+    return getFullPathName(inodes, depth-1);
+  }
   
   /**
    * Create a directory 

+ 35 - 0
src/java/org/apache/hadoop/hdfs/server/namenode/FSInodeInfo.java

@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+/** 
+ * This interface is used used the pluggable block placement policy
+ * to expose a few characteristics of an Inode.
+ */
+public interface FSInodeInfo {
+
+  /**
+   * a string representation of an inode
+   * 
+   * @return the full pathname (from root) that this inode represents
+   */
+
+  public String getFullPathName() ;
+}
+    
+    

+ 44 - 22
src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Path;
@@ -90,7 +91,7 @@ import javax.security.auth.login.LoginException;
  * 4)  machine --> blocklist (inverted #2)
  * 5)  LRU cache of updated-heartbeat machines
  ***************************************************/
-public class FSNamesystem implements FSConstants, FSNamesystemMBean {
+public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterStats {
   public static final Log LOG = LogFactory.getLog(FSNamesystem.class);
   public static final String AUDIT_FORMAT =
     "ugi=%s\t" +  // ugi
@@ -818,6 +819,24 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
     return dir.getPreferredBlockSize(filename);
   }
 
+  /*
+   * Verify that parent dir exists
+   */
+  private void verifyParentDir(String src) throws FileAlreadyExistsException,
+      FileNotFoundException {
+    Path parent = new Path(src).getParent();
+    if (parent != null) {
+      INode[] pathINodes = dir.getExistingPathINodes(parent.toString());
+      if (pathINodes[pathINodes.length - 1] == null) {
+        throw new FileNotFoundException("Parent directory doesn't exist: "
+            + parent.toString());
+      } else if (!pathINodes[pathINodes.length - 1].isDirectory()) {
+        throw new FileAlreadyExistsException("Parent path is not a directory: "
+            + parent.toString());
+      }
+    }
+  }
+
   /**
    * Create a new file entry in the namespace.
    * 
@@ -828,10 +847,11 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
    */
   void startFile(String src, PermissionStatus permissions,
                  String holder, String clientMachine,
-                 EnumSet<CreateFlag> flag, short replication, long blockSize
+                 EnumSet<CreateFlag> flag, boolean createParent, 
+                 short replication, long blockSize
                 ) throws IOException {
     startFileInternal(src, permissions, holder, clientMachine, flag,
-        replication, blockSize);
+        createParent, replication, blockSize);
     getEditLog().logSync();
     if (auditLog.isInfoEnabled()) {
       final FileStatus stat = dir.getFileInfo(src);
@@ -846,6 +866,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
                                               String holder, 
                                               String clientMachine, 
                                               EnumSet<CreateFlag> flag,
+                                              boolean createParent,
                                               short replication,
                                               long blockSize
                                               ) throws IOException {
@@ -857,6 +878,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
       NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
           + ", holder=" + holder
           + ", clientMachine=" + clientMachine
+          + ", createParent=" + createParent
           + ", replication=" + replication
           + ", overwrite=" + overwrite
           + ", append=" + append);
@@ -883,6 +905,10 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
       }
     }
 
+    if (!createParent) {
+      verifyParentDir(src);
+    }
+
     try {
       INode myFile = dir.getFileINode(src);
       if (myFile != null && myFile.isUnderConstruction()) {
@@ -940,7 +966,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
           else {
             //append & create a nonexist file equals to overwrite
             this.startFileInternal(src, permissions, holder, clientMachine,
-                EnumSet.of(CreateFlag.OVERWRITE), replication, blockSize);
+                EnumSet.of(CreateFlag.OVERWRITE), createParent, replication, blockSize);
             return;
           }
         } else if (myFile.isDirectory()) {
@@ -1016,7 +1042,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
                             " Please refer to dfs.support.append configuration parameter.");
     }
     startFileInternal(src, null, holder, clientMachine, EnumSet.of(CreateFlag.APPEND), 
-                      (short)blockManager.maxReplication, (long)0);
+                      false, (short)blockManager.maxReplication, (long)0);
     getEditLog().logSync();
 
     //
@@ -1128,7 +1154,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
 
     // choose targets for the new block to be allocated.
     DatanodeDescriptor targets[] = blockManager.replicator.chooseTarget(
-        replication, clientNode, null, blockSize);
+        src, replication, clientNode, blockSize);
     if (targets.length < blockManager.minReplication) {
       throw new IOException("File " + src + " could only be replicated to " +
                             targets.length + " nodes, instead of " +
@@ -1511,9 +1537,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
   /**
    * Create all the necessary directories
    */
-  public boolean mkdirs(String src, PermissionStatus permissions
-      ) throws IOException {
-    boolean status = mkdirsInternal(src, permissions);
+  public boolean mkdirs(String src, PermissionStatus permissions,
+      boolean createParent) throws IOException {
+    boolean status = mkdirsInternal(src, permissions, createParent);
     getEditLog().logSync();
     if (status && auditLog.isInfoEnabled()) {
       final FileStatus stat = dir.getFileInfo(src);
@@ -1528,7 +1554,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
    * Create all the necessary directories
    */
   private synchronized boolean mkdirsInternal(String src,
-      PermissionStatus permissions) throws IOException {
+      PermissionStatus permissions, boolean createParent) throws IOException {
     NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
     if (isPermissionEnabled) {
       checkTraverse(src);
@@ -1547,6 +1573,10 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
       checkAncestorAccess(src, FsAction.WRITE);
     }
 
+    if (!createParent) {
+      verifyParentDir(src);
+    }
+
     // validate that we have enough inodes. This is, at best, a 
     // heuristic because the mkdirs() operation migth need to 
     // create multiple inodes.
@@ -2372,8 +2402,10 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
   void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess, 
                               Block b, short replication,
                               DatanodeDescriptor addedNode,
-                              DatanodeDescriptor delNodeHint) {
+                              DatanodeDescriptor delNodeHint,
+                              BlockPlacementPolicy replicator) {
     // first form a rack to datanodes map and
+    INodeFile inode = blockManager.getINode(b);
     HashMap<String, ArrayList<DatanodeDescriptor>> rackMap =
       new HashMap<String, ArrayList<DatanodeDescriptor>>();
     for (Iterator<DatanodeDescriptor> iter = nonExcess.iterator();
@@ -2417,17 +2449,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
             (priSet.contains(delNodeHint) || (addedNode != null && !priSet.contains(addedNode))) ) {
           cur = delNodeHint;
       } else { // regular excessive replica removal
-        Iterator<DatanodeDescriptor> iter = 
-          priSet.isEmpty() ? remains.iterator() : priSet.iterator();
-          while( iter.hasNext() ) {
-            DatanodeDescriptor node = iter.next();
-            long free = node.getRemaining();
-
-            if (minSpace > free) {
-              minSpace = free;
-              cur = node;
-            }
-          }
+        cur = replicator.chooseReplicaToDelete(inode, b, replication, priSet, remains);
       }
 
       firstOne = false;

+ 7 - 1
src/java/org/apache/hadoop/hdfs/server/namenode/INode.java

@@ -33,7 +33,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
  * This is a base INode class containing common fields for file and 
  * directory inodes.
  */
-abstract class INode implements Comparable<byte[]> {
+abstract class INode implements Comparable<byte[]>, FSInodeInfo {
   protected byte[] name;
   protected INodeDirectory parent;
   protected long modificationTime;
@@ -246,6 +246,12 @@ abstract class INode implements Comparable<byte[]> {
     this.name = name;
   }
 
+  /** {@inheritDoc} */
+  public String getFullPathName() {
+    // Get the full path name of this inode.
+    return FSDirectory.getFullPathName(this);
+  }
+
   /** {@inheritDoc} */
   public String toString() {
     return "\"" + getLocalName() + "\":" + getPermissionStatus();

+ 4 - 3
src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -556,6 +556,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
                      FsPermission masked,
                              String clientName, 
                              EnumSetWritable<CreateFlag> flag,
+                             boolean createParent,
                              short replication,
                              long blockSize
                              ) throws IOException {
@@ -571,7 +572,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
     namesystem.startFile(src,
         new PermissionStatus(UserGroupInformation.getCurrentUGI().getUserName(),
             null, masked),
-        clientName, clientMachine, flag.get(), replication, blockSize);
+        clientName, clientMachine, flag.get(), createParent, replication, blockSize);
     myMetrics.numFilesCreated.inc();
     myMetrics.numCreateFileOps.inc();
   }
@@ -730,7 +731,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
   }
     
   /** {@inheritDoc} */
-  public boolean mkdirs(String src, FsPermission masked) throws IOException {
+  public boolean mkdirs(String src, FsPermission masked, boolean createParent) throws IOException {
     stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src);
     if (!checkPathLength(src)) {
       throw new IOException("mkdirs: Pathname too long.  Limit " 
@@ -738,7 +739,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
     }
     return namesystem.mkdirs(src,
         new PermissionStatus(UserGroupInformation.getCurrentUGI().getUserName(),
-            null, masked));
+            null, masked), createParent);
   }
 
   /**

+ 4 - 4
src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java

@@ -253,8 +253,8 @@ public class NamenodeFsck {
                     locs.length + " replica(s).");
       }
       // verify block placement policy
-      int missingRacks = ReplicationTargetChooser.verifyBlockPlacement(
-                    lBlk, targetFileReplication, networktopology);
+      int missingRacks = BlockPlacementPolicy.getInstance(conf, null, networktopology).
+                           verifyBlockPlacement(path, lBlk, Math.min(2,targetFileReplication));
       if (missingRacks > 0) {
         res.numMisReplicatedBlocks++;
         misReplicatedPerFile++;
@@ -335,7 +335,7 @@ public class NamenodeFsck {
     String target = lostFound + file.getPath();
     String errmsg = "Failed to move " + file.getPath() + " to /lost+found";
     try {
-      if (!namenode.mkdirs(target, file.getPermission())) {
+      if (!namenode.mkdirs(target, file.getPermission(), true)) {
         LOG.warn(errmsg);
         return;
       }
@@ -501,7 +501,7 @@ public class NamenodeFsck {
       
       final FileStatus lfStatus = dfs.getFileInfo(lfName);
       if (lfStatus == null) { // not exists
-        lfInitedOk = dfs.mkdirs(lfName);
+        lfInitedOk = dfs.mkdirs(lfName, null, true);
         lostFound = lfName;
       } else if (!lfStatus.isDir()) { // exists but not a directory
         LOG.warn("Cannot use /lost+found : a regular file with this name exists.");

+ 0 - 514
src/java/org/apache/hadoop/hdfs/server/namenode/ReplicationTargetChooser.java

@@ -1,514 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import org.apache.commons.logging.*;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.net.Node;
-import org.apache.hadoop.net.NodeBase;
-import java.util.*;
-
-/** The class is responsible for choosing the desired number of targets
- * for placing block replicas.
- * The replica placement strategy is that if the writer is on a datanode,
- * the 1st replica is placed on the local machine, 
- * otherwise a random datanode. The 2nd replica is placed on a datanode
- * that is on a different rack. The 3rd replica is placed on a datanode
- * which is on a different node of the rack as the second replica.
- */
-class ReplicationTargetChooser {
-  private final boolean considerLoad; 
-  private NetworkTopology clusterMap;
-  private FSNamesystem fs;
-    
-  ReplicationTargetChooser(boolean considerLoad,  FSNamesystem fs,
-                           NetworkTopology clusterMap) {
-    this.considerLoad = considerLoad;
-    this.fs = fs;
-    this.clusterMap = clusterMap;
-  }
-    
-  private static class NotEnoughReplicasException extends Exception {
-    private static final long serialVersionUID = 1L;
-
-    NotEnoughReplicasException(String msg) {
-      super(msg);
-    }
-  }
-    
-  /**
-   * choose <i>numOfReplicas</i> data nodes for <i>writer</i> to replicate
-   * a block with size <i>blocksize</i> 
-   * If not, return as many as we can.
-   * 
-   * @param numOfReplicas: number of replicas wanted.
-   * @param writer: the writer's machine, null if not in the cluster.
-   * @param excludedNodes: datanodes that should not be considered targets.
-   * @param blocksize: size of the data to be written.
-   * @return array of DatanodeDescriptor instances chosen as targets
-   * and sorted as a pipeline.
-   */
-  DatanodeDescriptor[] chooseTarget(int numOfReplicas,
-                                    DatanodeDescriptor writer,
-                                    HashMap<Node, Node> excludedNodes,
-                                    long blocksize) {
-    if (excludedNodes == null) {
-      excludedNodes = new HashMap<Node, Node>();
-    }
-      
-    return chooseTarget(numOfReplicas, writer, 
-                        new ArrayList<DatanodeDescriptor>(), excludedNodes, blocksize);
-  }
-    
-  /**
-   * choose <i>numOfReplicas</i> data nodes for <i>writer</i> 
-   * to re-replicate a block with size <i>blocksize</i> 
-   * If not, return as many as we can.
-   * 
-   * @param numOfReplicas: additional number of replicas wanted.
-   * @param writer: the writer's machine, null if not in the cluster.
-   * @param choosenNodes: datanodes that have been chosen as targets.
-   * @param excludedNodes: datanodes that should not be considered targets.
-   * @param blocksize: size of the data to be written.
-   * @return array of DatanodeDescriptor instances chosen as target 
-   * and sorted as a pipeline.
-   */
-  DatanodeDescriptor[] chooseTarget(int numOfReplicas,
-                                    DatanodeDescriptor writer,
-                                    List<DatanodeDescriptor> choosenNodes,
-                                    HashMap<Node, Node> excludedNodes,
-                                    long blocksize) {
-    if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
-      return new DatanodeDescriptor[0];
-    }
-      
-    if (excludedNodes == null) {
-      excludedNodes = new HashMap<Node, Node>();
-    }
-      
-    int clusterSize = clusterMap.getNumOfLeaves();
-    int totalNumOfReplicas = choosenNodes.size()+numOfReplicas;
-    if (totalNumOfReplicas > clusterSize) {
-      numOfReplicas -= (totalNumOfReplicas-clusterSize);
-      totalNumOfReplicas = clusterSize;
-    }
-      
-    int maxNodesPerRack = 
-      (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
-      
-    List<DatanodeDescriptor> results = 
-      new ArrayList<DatanodeDescriptor>(choosenNodes);
-    for (Node node:choosenNodes) {
-      excludedNodes.put(node, node);
-    }
-      
-    if (!clusterMap.contains(writer)) {
-      writer=null;
-    }
-      
-    DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, 
-                                                excludedNodes, blocksize, maxNodesPerRack, results);
-      
-    results.removeAll(choosenNodes);
-      
-    // sorting nodes to form a pipeline
-    return getPipeline((writer==null)?localNode:writer,
-                       results.toArray(new DatanodeDescriptor[results.size()]));
-  }
-    
-  /* choose <i>numOfReplicas</i> from all data nodes */
-  private DatanodeDescriptor chooseTarget(int numOfReplicas,
-                                          DatanodeDescriptor writer,
-                                          HashMap<Node, Node> excludedNodes,
-                                          long blocksize,
-                                          int maxNodesPerRack,
-                                          List<DatanodeDescriptor> results) {
-      
-    if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
-      return writer;
-    }
-      
-    int numOfResults = results.size();
-    boolean newBlock = (numOfResults==0);
-    if (writer == null && !newBlock) {
-      writer = results.get(0);
-    }
-      
-    try {
-      if (numOfResults == 0) {
-        writer = chooseLocalNode(writer, excludedNodes, 
-                                 blocksize, maxNodesPerRack, results);
-        if (--numOfReplicas == 0) {
-          return writer;
-        }
-      }
-      if (numOfResults <= 1) {
-        chooseRemoteRack(1, results.get(0), excludedNodes, 
-                         blocksize, maxNodesPerRack, results);
-        if (--numOfReplicas == 0) {
-          return writer;
-        }
-      }
-      if (numOfResults <= 2) {
-        if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
-          chooseRemoteRack(1, results.get(0), excludedNodes,
-                           blocksize, maxNodesPerRack, results);
-        } else if (newBlock){
-          chooseLocalRack(results.get(1), excludedNodes, blocksize, 
-                          maxNodesPerRack, results);
-        } else {
-          chooseLocalRack(writer, excludedNodes, blocksize,
-                          maxNodesPerRack, results);
-        }
-        if (--numOfReplicas == 0) {
-          return writer;
-        }
-      }
-      chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, 
-                   blocksize, maxNodesPerRack, results);
-    } catch (NotEnoughReplicasException e) {
-      FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of "
-               + numOfReplicas);
-    }
-    return writer;
-  }
-    
-  /* choose <i>localMachine</i> as the target.
-   * if <i>localMachine</i> is not available, 
-   * choose a node on the same rack
-   * @return the chosen node
-   */
-  private DatanodeDescriptor chooseLocalNode(
-                                             DatanodeDescriptor localMachine,
-                                             HashMap<Node, Node> excludedNodes,
-                                             long blocksize,
-                                             int maxNodesPerRack,
-                                             List<DatanodeDescriptor> results)
-    throws NotEnoughReplicasException {
-    // if no local machine, randomly choose one node
-    if (localMachine == null)
-      return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                          blocksize, maxNodesPerRack, results);
-      
-    // otherwise try local machine first
-    Node oldNode = excludedNodes.put(localMachine, localMachine);
-    if (oldNode == null) { // was not in the excluded list
-      if (isGoodTarget(localMachine, blocksize,
-                       maxNodesPerRack, false, results)) {
-        results.add(localMachine);
-        return localMachine;
-      }
-    } 
-      
-    // try a node on local rack
-    return chooseLocalRack(localMachine, excludedNodes, 
-                           blocksize, maxNodesPerRack, results);
-  }
-    
-  /* choose one node from the rack that <i>localMachine</i> is on.
-   * if no such node is available, choose one node from the rack where
-   * a second replica is on.
-   * if still no such node is available, choose a random node 
-   * in the cluster.
-   * @return the chosen node
-   */
-  private DatanodeDescriptor chooseLocalRack(
-                                             DatanodeDescriptor localMachine,
-                                             HashMap<Node, Node> excludedNodes,
-                                             long blocksize,
-                                             int maxNodesPerRack,
-                                             List<DatanodeDescriptor> results)
-    throws NotEnoughReplicasException {
-    // no local machine, so choose a random machine
-    if (localMachine == null) {
-      return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                          blocksize, maxNodesPerRack, results);
-    }
-      
-    // choose one from the local rack
-    try {
-      return chooseRandom(
-                          localMachine.getNetworkLocation(),
-                          excludedNodes, blocksize, maxNodesPerRack, results);
-    } catch (NotEnoughReplicasException e1) {
-      // find the second replica
-      DatanodeDescriptor newLocal=null;
-      for(Iterator<DatanodeDescriptor> iter=results.iterator();
-          iter.hasNext();) {
-        DatanodeDescriptor nextNode = iter.next();
-        if (nextNode != localMachine) {
-          newLocal = nextNode;
-          break;
-        }
-      }
-      if (newLocal != null) {
-        try {
-          return chooseRandom(
-                              newLocal.getNetworkLocation(),
-                              excludedNodes, blocksize, maxNodesPerRack, results);
-        } catch(NotEnoughReplicasException e2) {
-          //otherwise randomly choose one from the network
-          return chooseRandom(NodeBase.ROOT, excludedNodes,
-                              blocksize, maxNodesPerRack, results);
-        }
-      } else {
-        //otherwise randomly choose one from the network
-        return chooseRandom(NodeBase.ROOT, excludedNodes,
-                            blocksize, maxNodesPerRack, results);
-      }
-    }
-  }
-    
-  /* choose <i>numOfReplicas</i> nodes from the racks 
-   * that <i>localMachine</i> is NOT on.
-   * if not enough nodes are available, choose the remaining ones 
-   * from the local rack
-   */
-    
-  private void chooseRemoteRack(int numOfReplicas,
-                                DatanodeDescriptor localMachine,
-                                HashMap<Node, Node> excludedNodes,
-                                long blocksize,
-                                int maxReplicasPerRack,
-                                List<DatanodeDescriptor> results)
-    throws NotEnoughReplicasException {
-    int oldNumOfReplicas = results.size();
-    // randomly choose one node from remote racks
-    try {
-      chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
-                   excludedNodes, blocksize, maxReplicasPerRack, results);
-    } catch (NotEnoughReplicasException e) {
-      chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
-                   localMachine.getNetworkLocation(), excludedNodes, blocksize, 
-                   maxReplicasPerRack, results);
-    }
-  }
-
-  /* Randomly choose one target from <i>nodes</i>.
-   * @return the chosen node
-   */
-  private DatanodeDescriptor chooseRandom(
-                                          String nodes,
-                                          HashMap<Node, Node> excludedNodes,
-                                          long blocksize,
-                                          int maxNodesPerRack,
-                                          List<DatanodeDescriptor> results) 
-    throws NotEnoughReplicasException {
-    int numOfAvailableNodes =
-      clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
-    while(numOfAvailableNodes > 0) {
-      DatanodeDescriptor choosenNode = 
-        (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
-
-      Node oldNode = excludedNodes.put(choosenNode, choosenNode);
-      if (oldNode == null) { // choosendNode was not in the excluded list
-        numOfAvailableNodes--;
-        if (isGoodTarget(choosenNode, blocksize, maxNodesPerRack, results)) {
-          results.add(choosenNode);
-          return choosenNode;
-        }
-      }
-    }
-
-    throw new NotEnoughReplicasException(
-        "Not able to place enough replicas");
-  }
-    
-  /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
-   */
-  private void chooseRandom(int numOfReplicas,
-                            String nodes,
-                            HashMap<Node, Node> excludedNodes,
-                            long blocksize,
-                            int maxNodesPerRack,
-                            List<DatanodeDescriptor> results)
-    throws NotEnoughReplicasException {
-      
-    int numOfAvailableNodes =
-      clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
-    while(numOfReplicas > 0 && numOfAvailableNodes > 0) {
-      DatanodeDescriptor choosenNode = 
-        (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
-      Node oldNode = excludedNodes.put(choosenNode, choosenNode);
-      if (oldNode == null) {
-        numOfAvailableNodes--;
-
-        if (isGoodTarget(choosenNode, blocksize, maxNodesPerRack, results)) {
-          numOfReplicas--;
-          results.add(choosenNode);
-        }
-      }
-    }
-      
-    if (numOfReplicas>0) {
-      throw new NotEnoughReplicasException(
-                                           "Not able to place enough replicas");
-    }
-  }
-    
-  /* judge if a node is a good target.
-   * return true if <i>node</i> has enough space, 
-   * does not have too much load, and the rack does not have too many nodes
-   */
-  private boolean isGoodTarget(DatanodeDescriptor node,
-                               long blockSize, int maxTargetPerLoc,
-                               List<DatanodeDescriptor> results) {
-    return isGoodTarget(node, blockSize, maxTargetPerLoc,
-                        this.considerLoad, results);
-  }
-    
-  private boolean isGoodTarget(DatanodeDescriptor node,
-                               long blockSize, int maxTargetPerLoc,
-                               boolean considerLoad,
-                               List<DatanodeDescriptor> results) {
-    Log logr = FSNamesystem.LOG;
-    // check if the node is (being) decommissed
-    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
-      logr.debug("Node "+NodeBase.getPath(node)+
-                " is not chosen because the node is (being) decommissioned");
-      return false;
-    }
-
-    long remaining = node.getRemaining() - 
-                     (node.getBlocksScheduled() * blockSize); 
-    // check the remaining capacity of the target machine
-    if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
-      logr.debug("Node "+NodeBase.getPath(node)+
-                " is not chosen because the node does not have enough space");
-      return false;
-    }
-      
-    // check the communication traffic of the target machine
-    if (considerLoad) {
-      double avgLoad = 0;
-      int size = clusterMap.getNumOfLeaves();
-      if (size != 0) {
-        avgLoad = (double)fs.getTotalLoad()/size;
-      }
-      if (node.getXceiverCount() > (2.0 * avgLoad)) {
-        logr.debug("Node "+NodeBase.getPath(node)+
-                  " is not chosen because the node is too busy");
-        return false;
-      }
-    }
-      
-    // check if the target rack has chosen too many nodes
-    String rackname = node.getNetworkLocation();
-    int counter=1;
-    for(Iterator<DatanodeDescriptor> iter = results.iterator();
-        iter.hasNext();) {
-      Node result = iter.next();
-      if (rackname.equals(result.getNetworkLocation())) {
-        counter++;
-      }
-    }
-    if (counter>maxTargetPerLoc) {
-      logr.debug("Node "+NodeBase.getPath(node)+
-                " is not chosen because the rack has too many chosen nodes");
-      return false;
-    }
-    return true;
-  }
-    
-  /* Return a pipeline of nodes.
-   * The pipeline is formed finding a shortest path that 
-   * starts from the writer and traverses all <i>nodes</i>
-   * This is basically a traveling salesman problem.
-   */
-  private DatanodeDescriptor[] getPipeline(
-                                           DatanodeDescriptor writer,
-                                           DatanodeDescriptor[] nodes) {
-    if (nodes.length==0) return nodes;
-      
-    synchronized(clusterMap) {
-      int index=0;
-      if (writer == null || !clusterMap.contains(writer)) {
-        writer = nodes[0];
-      }
-      for(;index<nodes.length; index++) {
-        DatanodeDescriptor shortestNode = nodes[index];
-        int shortestDistance = clusterMap.getDistance(writer, shortestNode);
-        int shortestIndex = index;
-        for(int i=index+1; i<nodes.length; i++) {
-          DatanodeDescriptor currentNode = nodes[i];
-          int currentDistance = clusterMap.getDistance(writer, currentNode);
-          if (shortestDistance>currentDistance) {
-            shortestDistance = currentDistance;
-            shortestNode = currentNode;
-            shortestIndex = i;
-          }
-        }
-        //switch position index & shortestIndex
-        if (index != shortestIndex) {
-          nodes[shortestIndex] = nodes[index];
-          nodes[index] = shortestNode;
-        }
-        writer = shortestNode;
-      }
-    }
-    return nodes;
-  }
-
-  /**
-   * Verify that the block is replicated on at least 2 different racks
-   * if there is more than one rack in the system.
-   * 
-   * @param lBlk block with locations
-   * @param cluster 
-   * @return 1 if the block must be replicated on additional rack,
-   * or 0 if the number of racks is sufficient.
-   */
-  public static int verifyBlockPlacement(LocatedBlock lBlk,
-                                         short replication,
-                                         NetworkTopology cluster) {
-    int numRacks = verifyBlockPlacement(lBlk, Math.min(2,replication), cluster);
-    return numRacks < 0 ? 0 : numRacks;
-  }
-
-  /**
-   * Verify that the block is replicated on at least minRacks different racks
-   * if there is more than minRacks rack in the system.
-   * 
-   * @param lBlk block with locations
-   * @param minRacks number of racks the block should be replicated to
-   * @param cluster 
-   * @return the difference between the required and the actual number of racks
-   * the block is replicated to.
-   */
-  public static int verifyBlockPlacement(LocatedBlock lBlk,
-                                         int minRacks,
-                                         NetworkTopology cluster) {
-    DatanodeInfo[] locs = lBlk.getLocations();
-    if (locs == null)
-      locs = new DatanodeInfo[0];
-    int numRacks = cluster.getNumOfRacks();
-    if(numRacks <= 1) // only one rack
-      return 0;
-    minRacks = Math.min(minRacks, numRacks);
-    // 1. Check that all locations are different.
-    // 2. Count locations on different racks.
-    Set<String> racks = new TreeSet<String>();
-    for (DatanodeInfo dn : locs)
-      racks.add(dn.getNetworkLocation());
-    return minRacks - racks.size();
-  }
-} //end of Replicator
-

+ 2 - 2
src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java

@@ -157,7 +157,7 @@ public class TestDFSClientRetries extends TestCase {
     
     public FsServerDefaults getServerDefaults() throws IOException { return null; }
     
-    public void create(String src, FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, short replication, long blockSize) throws IOException {}
+    public void create(String src, FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize) throws IOException {}
     
     public LocatedBlock append(String src, String clientName) throws IOException { return null; }
 
@@ -179,7 +179,7 @@ public class TestDFSClientRetries extends TestCase {
 
     public boolean delete(String src, boolean recursive) throws IOException { return false; }
 
-    public boolean mkdirs(String src, FsPermission masked) throws IOException { return false; }
+    public boolean mkdirs(String src, FsPermission masked, boolean createParent) throws IOException { return false; }
 
     public FileStatus[] getListing(String src) throws IOException { return null; }
 

+ 44 - 0
src/test/hdfs/org/apache/hadoop/hdfs/TestDFSMkdirs.java

@@ -20,8 +20,10 @@ package org.apache.hadoop.hdfs;
 import junit.framework.TestCase;
 import java.io.*;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 
 
 /**
@@ -73,4 +75,46 @@ public class TestDFSMkdirs extends TestCase {
       cluster.shutdown();
     }
   }
+  
+  /**
+   * Tests mkdir will not create directory when parent is missing.
+   */
+  public void testMkdir() throws IOException {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+    DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
+    try {
+      // Create a dir in root dir, should succeed
+      assertTrue(dfs.mkdir(new Path("/mkdir-" + System.currentTimeMillis()),
+          FsPermission.getDefault()));
+      // Create a dir when parent dir exists as a file, should fail
+      IOException expectedException = null;
+      String filePath = "/mkdir-file-" + System.currentTimeMillis();
+      writeFile(dfs, new Path(filePath));
+      try {
+        dfs.mkdir(new Path(filePath + "/mkdir"), FsPermission.getDefault());
+      } catch (IOException e) {
+        expectedException = e;
+      }
+      assertTrue("Create a directory when parent dir exists as file using"
+          + " mkdir() should throw FileAlreadyExistsException ",
+          expectedException != null
+              && expectedException instanceof FileAlreadyExistsException);
+      // Create a dir in a non-exist directory, should fail
+      expectedException = null;
+      try {
+        dfs.mkdir(new Path("/non-exist/mkdir-" + System.currentTimeMillis()),
+            FsPermission.getDefault());
+      } catch (IOException e) {
+        expectedException = e;
+      }
+      assertTrue("Create a directory in a non-exist parent dir using"
+          + " mkdir() should throw FileNotFoundException ",
+          expectedException != null
+              && expectedException instanceof FileNotFoundException);
+    } finally {
+      dfs.close();
+      cluster.shutdown();
+    }
+  }
 }

+ 95 - 0
src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java

@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs;
 
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -30,6 +31,7 @@ import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
@@ -735,6 +737,99 @@ public class TestFileCreation extends junit.framework.TestCase {
     }
   }
   
+  /**
+   * Test file creation using createNonRecursive().
+   */
+  public void testFileCreationNonRecursive() throws IOException {
+    Configuration conf = new Configuration();
+    if (simulatedStorage) {
+      conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+    }
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fs = cluster.getFileSystem();
+    final Path path = new Path("/" + System.currentTimeMillis()
+        + "-testFileCreationNonRecursive");
+    FSDataOutputStream out = null;
+
+    try {
+      IOException expectedException = null;
+      final String nonExistDir = "/non-exist-" + System.currentTimeMillis();
+
+      fs.delete(new Path(nonExistDir), true);
+      EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.CREATE);
+      // Create a new file in root dir, should succeed
+      out = createNonRecursive(fs, path, 1, createFlag);
+      out.close();
+      // Create a file when parent dir exists as file, should fail
+      expectedException = null;
+      try {
+        createNonRecursive(fs, new Path(path, "Create"), 1, createFlag);
+      } catch (IOException e) {
+        expectedException = e;
+      }
+      assertTrue("Create a file when parent directory exists as a file"
+          + " should throw FileAlreadyExistsException ",
+          expectedException != null
+              && expectedException instanceof FileAlreadyExistsException);
+      fs.delete(path, true);
+      // Create a file in a non-exist directory, should fail
+      final Path path2 = new Path(nonExistDir + "/testCreateNonRecursive");
+      expectedException = null;
+      try {
+        createNonRecursive(fs, path2, 1, createFlag);
+      } catch (IOException e) {
+        expectedException = e;
+      }
+      assertTrue("Create a file in a non-exist dir using"
+          + " createNonRecursive() should throw FileNotFoundException ",
+          expectedException != null
+              && expectedException instanceof FileNotFoundException);
+
+      EnumSet<CreateFlag> overwriteFlag = EnumSet.of(CreateFlag.OVERWRITE);
+      // Overwrite a file in root dir, should succeed
+      out = createNonRecursive(fs, path, 1, overwriteFlag);
+      out.close();
+      // Overwrite a file when parent dir exists as file, should fail
+      expectedException = null;
+      try {
+        createNonRecursive(fs, new Path(path, "Overwrite"), 1, overwriteFlag);
+      } catch (IOException e) {
+        expectedException = e;
+      }
+      assertTrue("Overwrite a file when parent directory exists as a file"
+          + " should throw FileAlreadyExistsException ",
+          expectedException != null
+              && expectedException instanceof FileAlreadyExistsException);
+      fs.delete(path, true);
+      // Overwrite a file in a non-exist directory, should fail
+      final Path path3 = new Path(nonExistDir + "/testOverwriteNonRecursive");
+      expectedException = null;
+      try {
+        createNonRecursive(fs, path3, 1, overwriteFlag);
+      } catch (IOException e) {
+        expectedException = e;
+      }
+      assertTrue("Overwrite a file in a non-exist dir using"
+          + " createNonRecursive() should throw FileNotFoundException ",
+          expectedException != null
+              && expectedException instanceof FileNotFoundException);
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
+  // creates a file using DistributedFileSystem.createNonRecursive()
+  static FSDataOutputStream createNonRecursive(FileSystem fs, Path name,
+      int repl, EnumSet<CreateFlag> flag) throws IOException {
+    System.out.println("createNonRecursive: Created " + name + " with " + repl
+        + " replica.");
+    FSDataOutputStream stm = ((DistributedFileSystem) fs).createNonRecursive(
+        name, FsPermission.getDefault(), flag, fs.getConf().getInt(
+            "io.file.buffer.size", 4096), (short) repl, (long) blockSize, null);
+    return stm;
+  }
+  
   // creates a file with the flag api
   static FSDataOutputStream createFileWithFlag(FileSystem fileSys, Path name, int repl, EnumSet<CreateFlag> flag)
     throws IOException {

+ 2 - 2
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java

@@ -516,7 +516,7 @@ public class NNThroughputBenchmark {
       // dummyActionNoSynch(fileIdx);
       nameNode.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
                       clientName, new EnumSetWritable<CreateFlag>(EnumSet
-              .of(CreateFlag.OVERWRITE)), replication, BLOCK_SIZE);
+              .of(CreateFlag.OVERWRITE)), true, replication, BLOCK_SIZE);
       long end = System.currentTimeMillis();
       for(boolean written = !closeUponCreate; !written; 
         written = nameNode.complete(fileNames[daemonId][inputIdx],
@@ -895,7 +895,7 @@ public class NNThroughputBenchmark {
       for(int idx=0; idx < nrFiles; idx++) {
         String fileName = nameGenerator.getNextFileName("ThroughputBench");
         nameNode.create(fileName, FsPermission.getDefault(), clientName,
-            new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.OVERWRITE)), replication,
+            new EnumSetWritable<CreateFlag>(EnumSet.of(CreateFlag.OVERWRITE)), true, replication,
             BLOCK_SIZE);
         Block lastBlock = addBlocks(fileName, clientName);
         nameNode.complete(fileName, clientName, lastBlock);

+ 78 - 71
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java

@@ -38,7 +38,8 @@ public class TestReplicationPolicy extends TestCase {
   private static final Configuration CONF = new Configuration();
   private static final NetworkTopology cluster;
   private static final NameNode namenode;
-  private static final ReplicationTargetChooser replicator;
+  private static final BlockPlacementPolicy replicator;
+  private static final String filename = "/dummyfile.txt";
   private static final DatanodeDescriptor dataNodes[] = 
     new DatanodeDescriptor[] {
       new DatanodeDescriptor(new DatanodeID("h1:5020"), "/d1/r1"),
@@ -91,30 +92,30 @@ public class TestReplicationPolicy extends TestCase {
         FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 4); // overloaded
 
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(
-                                      0, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      0, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 0);
     
-    targets = replicator.chooseTarget(
-                                      1, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      1, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 1);
     assertEquals(targets[0], dataNodes[0]);
     
-    targets = replicator.chooseTarget(
-                                      2, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      2, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 2);
     assertEquals(targets[0], dataNodes[0]);
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     
-    targets = replicator.chooseTarget(
-                                      3, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      3, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 3);
     assertEquals(targets[0], dataNodes[0]);
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
 
-    targets = replicator.chooseTarget(
-                                     4, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                     4, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 4);
     assertEquals(targets[0], dataNodes[0]);
     assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
@@ -137,41 +138,47 @@ public class TestReplicationPolicy extends TestCase {
   public void testChooseTarget2() throws Exception { 
     HashMap<Node, Node> excludedNodes;
     DatanodeDescriptor[] targets;
+    BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
+    List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
     
     excludedNodes = new HashMap<Node, Node>();
     excludedNodes.put(dataNodes[1], dataNodes[1]); 
-    targets = replicator.chooseTarget(
-                                      0, dataNodes[0], excludedNodes, BLOCK_SIZE);
+    targets = repl.chooseTarget(
+                                0, dataNodes[0], chosenNodes, excludedNodes, BLOCK_SIZE);
     assertEquals(targets.length, 0);
     
     excludedNodes.clear();
+    chosenNodes.clear();
     excludedNodes.put(dataNodes[1], dataNodes[1]); 
-    targets = replicator.chooseTarget(
-                                      1, dataNodes[0], excludedNodes, BLOCK_SIZE);
+    targets = repl.chooseTarget(
+                                1, dataNodes[0], chosenNodes, excludedNodes, BLOCK_SIZE);
     assertEquals(targets.length, 1);
     assertEquals(targets[0], dataNodes[0]);
     
     excludedNodes.clear();
+    chosenNodes.clear();
     excludedNodes.put(dataNodes[1], dataNodes[1]); 
-    targets = replicator.chooseTarget(
-                                      2, dataNodes[0], excludedNodes, BLOCK_SIZE);
+    targets = repl.chooseTarget(
+                                2, dataNodes[0], chosenNodes, excludedNodes, BLOCK_SIZE);
     assertEquals(targets.length, 2);
     assertEquals(targets[0], dataNodes[0]);
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     
     excludedNodes.clear();
+    chosenNodes.clear();
     excludedNodes.put(dataNodes[1], dataNodes[1]); 
-    targets = replicator.chooseTarget(
-                                      3, dataNodes[0], excludedNodes, BLOCK_SIZE);
+    targets = repl.chooseTarget(
+                                3, dataNodes[0], chosenNodes, excludedNodes, BLOCK_SIZE);
     assertEquals(targets.length, 3);
     assertEquals(targets[0], dataNodes[0]);
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
     
     excludedNodes.clear();
+    chosenNodes.clear();
     excludedNodes.put(dataNodes[1], dataNodes[1]); 
-    targets = replicator.chooseTarget(
-                                      4, dataNodes[0], excludedNodes, BLOCK_SIZE);
+    targets = repl.chooseTarget(
+                                4, dataNodes[0], chosenNodes, excludedNodes, BLOCK_SIZE);
     assertEquals(targets.length, 4);
     assertEquals(targets[0], dataNodes[0]);
     for(int i=1; i<4; i++) {
@@ -197,30 +204,30 @@ public class TestReplicationPolicy extends TestCase {
         (FSConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0); // no space
         
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(
-                                      0, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      0, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 0);
     
-    targets = replicator.chooseTarget(
-                                      1, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      1, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 1);
     assertEquals(targets[0], dataNodes[1]);
     
-    targets = replicator.chooseTarget(
-                                      2, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      2, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 2);
     assertEquals(targets[0], dataNodes[1]);
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     
-    targets = replicator.chooseTarget(
-                                      3, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      3, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 3);
     assertEquals(targets[0], dataNodes[1]);
     assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     
-    targets = replicator.chooseTarget(
-                                      4, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      4, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 4);
     assertEquals(targets[0], dataNodes[1]);
     for(int i=1; i<4; i++) {
@@ -252,23 +259,23 @@ public class TestReplicationPolicy extends TestCase {
     }
       
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(
-                                      0, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      0, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 0);
     
-    targets = replicator.chooseTarget(
-                                      1, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      1, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 1);
     assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
     
-    targets = replicator.chooseTarget(
-                                      2, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      2, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 2);
     assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     
-    targets = replicator.chooseTarget(
-                                      3, dataNodes[0], null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      3, dataNodes[0], BLOCK_SIZE);
     assertEquals(targets.length, 3);
     for(int i=0; i<3; i++) {
       assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0]));
@@ -292,21 +299,21 @@ public class TestReplicationPolicy extends TestCase {
    */
   public void testChooseTarget5() throws Exception {
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(
-                                      0, NODE, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      0, NODE, BLOCK_SIZE);
     assertEquals(targets.length, 0);
     
-    targets = replicator.chooseTarget(
-                                      1, NODE, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      1, NODE, BLOCK_SIZE);
     assertEquals(targets.length, 1);
     
-    targets = replicator.chooseTarget(
-                                      2, NODE, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      2, NODE, BLOCK_SIZE);
     assertEquals(targets.length, 2);
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     
-    targets = replicator.chooseTarget(
-                                      3, NODE, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      3, NODE, BLOCK_SIZE);
     assertEquals(targets.length, 3);
     assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));    
@@ -324,23 +331,23 @@ public class TestReplicationPolicy extends TestCase {
     chosenNodes.add(dataNodes[0]);    
     DatanodeDescriptor[] targets;
     
-    targets = replicator.chooseTarget(
-                                      0, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      0, dataNodes[0], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 0);
     
-    targets = replicator.chooseTarget(
-                                      1, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      1, dataNodes[0], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 1);
     assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
     
-    targets = replicator.chooseTarget(
-                                      2, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      2, dataNodes[0], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 2);
     assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
     assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
     
-    targets = replicator.chooseTarget(
-                                      3, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      3, dataNodes[0], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 3);
     assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
     assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
@@ -359,17 +366,17 @@ public class TestReplicationPolicy extends TestCase {
     chosenNodes.add(dataNodes[1]);
 
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(
-                                      0, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      0, dataNodes[0], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 0);
     
-    targets = replicator.chooseTarget(
-                                      1, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      1, dataNodes[0], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 1);
     assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
     
-    targets = replicator.chooseTarget(
-                                      2, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      2, dataNodes[0], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 2);
     assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
     assertFalse(cluster.isOnSameRack(dataNodes[0], targets[1]));
@@ -388,29 +395,29 @@ public class TestReplicationPolicy extends TestCase {
     chosenNodes.add(dataNodes[2]);
     
     DatanodeDescriptor[] targets;
-    targets = replicator.chooseTarget(
-                                      0, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      0, dataNodes[0], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 0);
     
-    targets = replicator.chooseTarget(
-                                      1, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      1, dataNodes[0], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 1);
     assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
     assertFalse(cluster.isOnSameRack(dataNodes[2], targets[0]));
     
-    targets = replicator.chooseTarget(
-                               1, dataNodes[2], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                               1, dataNodes[2], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 1);
     assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
     assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
 
-    targets = replicator.chooseTarget(
-                                      2, dataNodes[0], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                                      2, dataNodes[0], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 2);
     assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
     
-    targets = replicator.chooseTarget(
-                               2, dataNodes[2], chosenNodes, null, BLOCK_SIZE);
+    targets = replicator.chooseTarget(filename,
+                               2, dataNodes[2], chosenNodes, BLOCK_SIZE);
     assertEquals(targets.length, 2);
     assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
   }