فهرست منبع

HDFS-946. NameNode should not return full path name when lisitng a diretory or getting the status of a file. Contributed by Hairong Kuang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@915089 13f79535-47bb-0310-9956-ffa450edef68
Hairong Kuang 15 سال پیش
والد
کامیت
260e697611
24فایلهای تغییر یافته به همراه470 افزوده شده و 160 حذف شده
  1. 3 0
      CHANGES.txt
  2. 4 5
      src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ProxyFileDataServlet.java
  3. 17 6
      src/java/org/apache/hadoop/fs/Hdfs.java
  4. 4 4
      src/java/org/apache/hadoop/hdfs/DFSClient.java
  5. 3 2
      src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
  6. 25 0
      src/java/org/apache/hadoop/hdfs/DFSUtil.java
  7. 8 6
      src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  8. 6 6
      src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
  9. 236 0
      src/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java
  10. 7 6
      src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
  11. 12 12
      src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  12. 3 3
      src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
  13. 2 1
      src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
  14. 15 17
      src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  15. 9 8
      src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
  16. 11 28
      src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
  17. 3 2
      src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
  18. 13 10
      src/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java
  19. 4 4
      src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
  20. 15 14
      src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
  21. 2 2
      src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
  22. 4 4
      src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java
  23. 61 16
      src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java
  24. 3 4
      src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java

+ 3 - 0
CHANGES.txt

@@ -72,6 +72,9 @@ Trunk (unreleased changes)
 
 
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
+    HDFS-946. NameNode should not return full path name when lisitng a
+    diretory or getting the status of a file. (hairong)
+
   BUG FIXES
   BUG FIXES
     HDFS-913. Rename fault injection test TestRename.java to TestFiRename.java
     HDFS-913. Rename fault injection test TestRename.java to TestFiRename.java
     to include it in tests run by ant target run-test-hdfs-fault-inject.
     to include it in tests run by ant target run-test-hdfs-fault-inject.

+ 4 - 5
src/contrib/hdfsproxy/src/java/org/apache/hadoop/hdfsproxy/ProxyFileDataServlet.java

@@ -25,9 +25,8 @@ import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletRequest;
 
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.namenode.FileDataServlet;
 import org.apache.hadoop.hdfs.server.namenode.FileDataServlet;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 
 
@@ -49,12 +48,12 @@ public class ProxyFileDataServlet extends FileDataServlet {
 
 
   /** {@inheritDoc} */
   /** {@inheritDoc} */
   @Override
   @Override
-  protected URI createUri(FileStatus i, UserGroupInformation ugi,
+  protected URI createUri(String parent, HdfsFileStatus i, UserGroupInformation ugi,
       ClientProtocol nnproxy, HttpServletRequest request) throws IOException,
       ClientProtocol nnproxy, HttpServletRequest request) throws IOException,
       URISyntaxException {
       URISyntaxException {
     return new URI(request.getScheme(), null, request.getServerName(), request
     return new URI(request.getScheme(), null, request.getServerName(), request
-        .getServerPort(), "/streamFile", "filename=" + i.getPath() + "&ugi="
-        + ugi.getShortUserName(), null);
+        .getServerPort(), "/streamFile", "filename=" + i.getFullName(parent) 
+        + "&ugi=" + ugi.getShortUserName(), null);
   }
   }
 
 
   /** {@inheritDoc} */
   /** {@inheritDoc} */

+ 17 - 6
src/java/org/apache/hadoop/fs/Hdfs.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Progressable;
 
 
@@ -99,15 +100,24 @@ public class Hdfs extends AbstractFileSystem {
 
 
   @Override
   @Override
   protected FileStatus getFileStatus(Path f) throws IOException {
   protected FileStatus getFileStatus(Path f) throws IOException {
-    FileStatus fi = dfs.getFileInfo(getUriPath(f));
+    HdfsFileStatus fi = dfs.getFileInfo(getUriPath(f));
     if (fi != null) {
     if (fi != null) {
-      fi.setPath(fi.getPath().makeQualified(getUri(), null));
-      return fi;
+      return makeQualified(fi, f);
     } else {
     } else {
       throw new FileNotFoundException("File does not exist: " + f.toString());
       throw new FileNotFoundException("File does not exist: " + f.toString());
     }
     }
   }
   }
 
 
+  private FileStatus makeQualified(HdfsFileStatus f, Path parent) {
+    return new FileStatus(f.getLen(), f.isDir(), f.getReplication(),
+        f.getBlockSize(), f.getModificationTime(),
+        f.getAccessTime(),
+        f.getPermission(), f.getOwner(), f.getGroup(),
+        (f.getFullPath(parent)).makeQualified(
+            getUri(), null)); // fully-qualify path
+  }
+
+
   @Override
   @Override
   protected FsStatus getFsStatus() throws IOException {
   protected FsStatus getFsStatus() throws IOException {
     return dfs.getDiskStatus();
     return dfs.getDiskStatus();
@@ -120,14 +130,15 @@ public class Hdfs extends AbstractFileSystem {
 
 
   @Override
   @Override
   protected FileStatus[] listStatus(Path f) throws IOException {
   protected FileStatus[] listStatus(Path f) throws IOException {
-    FileStatus[] infos = dfs.listPaths(getUriPath(f));
+    HdfsFileStatus[] infos = dfs.listPaths(getUriPath(f));
     if (infos == null)
     if (infos == null)
       throw new FileNotFoundException("File " + f + " does not exist.");
       throw new FileNotFoundException("File " + f + " does not exist.");
 
 
+    FileStatus [] stats = new FileStatus[infos.length]; 
     for (int i = 0; i < infos.length; i++) {
     for (int i = 0; i < infos.length; i++) {
-      infos[i].setPath(infos[i].getPath().makeQualified(getUri(), null));
+      stats[i] = makeQualified(infos[i], f);
     }
     }
-    return infos;
+    return stats;
   }
   }
 
 
   @Override
   @Override

+ 4 - 4
src/java/org/apache/hadoop/hdfs/DFSClient.java

@@ -50,7 +50,6 @@ import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsStatus;
 import org.apache.hadoop.fs.FsStatus;
@@ -67,6 +66,7 @@ import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
@@ -649,7 +649,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
   OutputStream append(String src, int buffersize, Progressable progress
   OutputStream append(String src, int buffersize, Progressable progress
       ) throws IOException {
       ) throws IOException {
     checkOpen();
     checkOpen();
-    FileStatus stat = null;
+    HdfsFileStatus stat = null;
     LocatedBlock lastBlock = null;
     LocatedBlock lastBlock = null;
     try {
     try {
       stat = getFileInfo(src);
       stat = getFileInfo(src);
@@ -763,7 +763,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     return getFileInfo(src) != null;
     return getFileInfo(src) != null;
   }
   }
 
 
-  public FileStatus[] listPaths(String src) throws IOException {
+  public HdfsFileStatus[] listPaths(String src) throws IOException {
     checkOpen();
     checkOpen();
     try {
     try {
       return namenode.getListing(src);
       return namenode.getListing(src);
@@ -772,7 +772,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     }
     }
   }
   }
 
 
-  public FileStatus getFileInfo(String src) throws IOException {
+  public HdfsFileStatus getFileInfo(String src) throws IOException {
     checkOpen();
     checkOpen();
     try {
     try {
       return namenode.getFileInfo(src);
       return namenode.getFileInfo(src);

+ 3 - 2
src/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
@@ -291,7 +292,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
      * @param bytesPerChecksum number of bytes per checksum
      * @param bytesPerChecksum number of bytes per checksum
      * @throws IOException if error occurs
      * @throws IOException if error occurs
      */
      */
-    private DataStreamer(LocatedBlock lastBlock, FileStatus stat,
+    private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
         int bytesPerChecksum) throws IOException {
         int bytesPerChecksum) throws IOException {
       stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
       stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
       block = lastBlock.getBlock();
       block = lastBlock.getBlock();
@@ -1072,7 +1073,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
    * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
    * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
    */
    */
   DFSOutputStream(DFSClient dfsClient, String src, int buffersize, Progressable progress,
   DFSOutputStream(DFSClient dfsClient, String src, int buffersize, Progressable progress,
-      LocatedBlock lastBlock, FileStatus stat,
+      LocatedBlock lastBlock, HdfsFileStatus stat,
       int bytesPerChecksum) throws IOException {
       int bytesPerChecksum) throws IOException {
     this(dfsClient, src, stat.getBlockSize(), progress, bytesPerChecksum);
     this(dfsClient, src, stat.getBlockSize(), progress, bytesPerChecksum);
     initialFileSize = stat.getLen(); // length of file when opened
     initialFileSize = stat.getLen(); // length of file when opened

+ 25 - 0
src/java/org/apache/hadoop/hdfs/DFSUtil.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs;
 package org.apache.hadoop.hdfs;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.util.StringTokenizer;
 import java.util.StringTokenizer;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
@@ -96,5 +97,29 @@ public class DFSUtil {
     String user = conf.get(userNameKey, System.getProperty("user.name"));
     String user = conf.get(userNameKey, System.getProperty("user.name"));
     UserGroupInformation.loginUserFromKeytab(user, keytabFilename);
     UserGroupInformation.loginUserFromKeytab(user, keytabFilename);
   }
   }
+
+  /**
+   * Converts a byte array to a string using UTF8 encoding.
+   */
+  public static String bytes2String(byte[] bytes) {
+    try {
+      return new String(bytes, "UTF8");
+    } catch(UnsupportedEncodingException e) {
+      assert false : "UTF8 encoding is not supported ";
+    }
+    return null;
+  }
+
+  /**
+   * Converts a string to a byte array using UTF8 encoding.
+   */
+  public static byte[] string2Bytes(String str) {
+    try {
+      return str.getBytes("UTF8");
+    } catch(UnsupportedEncodingException e) {
+      assert false : "UTF8 encoding is not supported ";
+    }
+    return null;
+  }
 }
 }
 
 

+ 8 - 6
src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
 import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
@@ -303,23 +304,24 @@ public class DistributedFileSystem extends FileSystem {
     dfs.setQuota(getPathName(src), namespaceQuota, diskspaceQuota);
     dfs.setQuota(getPathName(src), namespaceQuota, diskspaceQuota);
   }
   }
   
   
-  private FileStatus makeQualified(FileStatus f) {
+  private FileStatus makeQualified(HdfsFileStatus f, Path parent) {
     return new FileStatus(f.getLen(), f.isDir(), f.getReplication(),
     return new FileStatus(f.getLen(), f.isDir(), f.getReplication(),
         f.getBlockSize(), f.getModificationTime(),
         f.getBlockSize(), f.getModificationTime(),
         f.getAccessTime(),
         f.getAccessTime(),
         f.getPermission(), f.getOwner(), f.getGroup(),
         f.getPermission(), f.getOwner(), f.getGroup(),
-        f.getPath().makeQualified(this)); // fully-qualify path
+        (f.getFullPath(parent)).makeQualified(
+            getUri(), getWorkingDirectory())); // fully-qualify path
   }
   }
 
 
   @Override
   @Override
   public FileStatus[] listStatus(Path p) throws IOException {
   public FileStatus[] listStatus(Path p) throws IOException {
-    FileStatus[] infos = dfs.listPaths(getPathName(p));
+    HdfsFileStatus[] infos = dfs.listPaths(getPathName(p));
     if (infos == null) 
     if (infos == null) 
       throw new FileNotFoundException("File " + p + " does not exist.");
       throw new FileNotFoundException("File " + p + " does not exist.");
     
     
     FileStatus[] stats = new FileStatus[infos.length];
     FileStatus[] stats = new FileStatus[infos.length];
     for (int i = 0; i < infos.length; i++) {
     for (int i = 0; i < infos.length; i++) {
-      stats[i] = makeQualified(infos[i]);
+      stats[i] = makeQualified(infos[i], p);
     }
     }
     return stats;
     return stats;
   }
   }
@@ -564,9 +566,9 @@ public class DistributedFileSystem extends FileSystem {
    */
    */
   @Override
   @Override
   public FileStatus getFileStatus(Path f) throws IOException {
   public FileStatus getFileStatus(Path f) throws IOException {
-    FileStatus fi = dfs.getFileInfo(getPathName(f));
+    HdfsFileStatus fi = dfs.getFileInfo(getPathName(f));
     if (fi != null) {
     if (fi != null) {
-      return makeQualified(fi);
+      return makeQualified(fi, f);
     } else {
     } else {
       throw new FileNotFoundException("File does not exist: " + f);
       throw new FileNotFoundException("File does not exist: " + f);
     }
     }

+ 6 - 6
src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -21,8 +21,6 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.IOException;
 
 
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Options;
@@ -55,9 +53,11 @@ public interface ClientProtocol extends VersionedProtocol {
    * Compared to the previous version the following changes have been introduced:
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
    * The log of historical changes can be retrieved from the svn).
-   * 55: Adding Delegation Token related APIs
+   * 57: getFileInfo returns HDFSFileStatus;
+   *     getListing returns HDFSFileStatus[].
+   * 
    */
    */
-  public static final long versionID = 55L;
+  public static final long versionID = 57L;
   
   
   ///////////////////////////////////////
   ///////////////////////////////////////
   // File contents
   // File contents
@@ -334,7 +334,7 @@ public interface ClientProtocol extends VersionedProtocol {
   /**
   /**
    * Get a listing of the indicated directory
    * Get a listing of the indicated directory
    */
    */
-  public FileStatus[] getListing(String src) throws IOException;
+  public HdfsFileStatus[] getListing(String src) throws IOException;
 
 
   ///////////////////////////////////////
   ///////////////////////////////////////
   // System issues and management
   // System issues and management
@@ -511,7 +511,7 @@ public interface ClientProtocol extends VersionedProtocol {
    * @return object containing information regarding the file
    * @return object containing information regarding the file
    *         or null if file not found
    *         or null if file not found
    */
    */
-  public FileStatus getFileInfo(String src) throws IOException;
+  public HdfsFileStatus getFileInfo(String src) throws IOException;
 
 
   /**
   /**
    * Get {@link ContentSummary} rooted at the specified directory.
    * Get {@link ContentSummary} rooted at the specified directory.

+ 236 - 0
src/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java

@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/** Interface that represents the over the wire information for a file.
+ */
+public class HdfsFileStatus implements Writable {
+
+  private byte[] path;  // local name of the inode that's encoded in java UTF8
+  private long length;
+  private boolean isdir;
+  private short block_replication;
+  private long blocksize;
+  private long modification_time;
+  private long access_time;
+  private FsPermission permission;
+  private String owner;
+  private String group;
+  
+  public static final byte[] EMPTY_NAME = new byte[0];
+
+  /**
+   * default constructor
+   */
+  public HdfsFileStatus() { this(0, false, 0, 0, 0, 0, null, null, null, null); }
+  
+  /**
+   * Constructor
+   * @param length the number of bytes the file has
+   * @param isdir if the path is a directory
+   * @param block_replication the replication factor
+   * @param blocksize the block size
+   * @param modification_time modification time
+   * @param access_time access time
+   * @param permission permission
+   * @param owner the owner of the path
+   * @param group the group of the path
+   * @param path the local name in java UTF8 encoding the same as that in-memory
+   */
+  public HdfsFileStatus(long length, boolean isdir, int block_replication,
+                    long blocksize, long modification_time, long access_time,
+                    FsPermission permission, String owner, String group, 
+                    byte[] path) {
+    this.length = length;
+    this.isdir = isdir;
+    this.block_replication = (short)block_replication;
+    this.blocksize = blocksize;
+    this.modification_time = modification_time;
+    this.access_time = access_time;
+    this.permission = (permission == null) ? 
+                      FsPermission.getDefault() : permission;
+    this.owner = (owner == null) ? "" : owner;
+    this.group = (group == null) ? "" : group;
+    this.path = path;
+  }
+
+  /**
+   * Get the length of this file, in bytes.
+   * @return the length of this file, in bytes.
+   */
+  final public long getLen() {
+    return length;
+  }
+
+  /**
+   * Is this a directory?
+   * @return true if this is a directory
+   */
+  final public boolean isDir() {
+    return isdir;
+  }
+
+  /**
+   * Get the block size of the file.
+   * @return the number of bytes
+   */
+  final public long getBlockSize() {
+    return blocksize;
+  }
+
+  /**
+   * Get the replication factor of a file.
+   * @return the replication factor of a file.
+   */
+  final public short getReplication() {
+    return block_replication;
+  }
+
+  /**
+   * Get the modification time of the file.
+   * @return the modification time of file in milliseconds since January 1, 1970 UTC.
+   */
+  final public long getModificationTime() {
+    return modification_time;
+  }
+
+  /**
+   * Get the access time of the file.
+   * @return the access time of file in milliseconds since January 1, 1970 UTC.
+   */
+  final public long getAccessTime() {
+    return access_time;
+  }
+
+  /**
+   * Get FsPermission associated with the file.
+   * @return permssion
+   */
+  final public FsPermission getPermission() {
+    return permission;
+  }
+  
+  /**
+   * Get the owner of the file.
+   * @return owner of the file
+   */
+  final public String getOwner() {
+    return owner;
+  }
+  
+  /**
+   * Get the group associated with the file.
+   * @return group for the file. 
+   */
+  final public String getGroup() {
+    return group;
+  }
+  
+  /**
+   * Check if the local name is empty
+   * @return true if the name is empty
+   */
+  final public boolean isEmptyLocalName() {
+    return path.length == 0;
+  }
+
+  /**
+   * Get the string representation of the local name
+   * @return the local name in string
+   */
+  final public String getLocalName() {
+    return DFSUtil.bytes2String(path);
+  }
+  
+  /**
+   * Get the string representation of the full path name
+   * @param parent the parent path
+   * @return the full path in string
+   */
+  final public String getFullName(final String parent) {
+    if (isEmptyLocalName()) {
+      return parent;
+    }
+    
+    StringBuilder fullName = new StringBuilder(parent);
+    if (!parent.endsWith(Path.SEPARATOR)) {
+      fullName.append(Path.SEPARATOR);
+    }
+    fullName.append(getLocalName());
+    return fullName.toString();
+  }
+
+  /**
+   * Get the full path
+   * @param parent the parent path
+   * @return the full path
+   */
+  final public Path getFullPath(final Path parent) {
+    if (isEmptyLocalName()) {
+      return parent;
+    }
+    
+    return new Path(parent, getLocalName());
+  }
+
+  //////////////////////////////////////////////////
+  // Writable
+  //////////////////////////////////////////////////
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(path.length);
+    out.write(path);
+    out.writeLong(length);
+    out.writeBoolean(isdir);
+    out.writeShort(block_replication);
+    out.writeLong(blocksize);
+    out.writeLong(modification_time);
+    out.writeLong(access_time);
+    permission.write(out);
+    Text.writeString(out, owner);
+    Text.writeString(out, group);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    int numOfBytes = in.readInt();
+    if (numOfBytes == 0) {
+      this.path = EMPTY_NAME;
+    } else {
+      this.path = new byte[numOfBytes];
+      in.readFully(path);
+    }
+    this.length = in.readLong();
+    this.isdir = in.readBoolean();
+    this.block_replication = in.readShort();
+    blocksize = in.readLong();
+    modification_time = in.readLong();
+    access_time = in.readLong();
+    permission.readFields(in);
+    owner = Text.readString(in);
+    group = Text.readString(in);
+  }
+}

+ 7 - 6
src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java

@@ -29,11 +29,11 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.servlet.http.HttpServletResponse;
 import javax.servlet.jsp.JspWriter;
 import javax.servlet.jsp.JspWriter;
 
 
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.security.BlockAccessToken;
 import org.apache.hadoop.hdfs.security.BlockAccessToken;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
@@ -59,7 +59,7 @@ class DatanodeJspHelper {
     final DFSClient dfs = new DFSClient(datanode.getNameNodeAddr(),
     final DFSClient dfs = new DFSClient(datanode.getNameNodeAddr(),
         JspHelper.conf);
         JspHelper.conf);
     String target = dir;
     String target = dir;
-    final FileStatus targetStatus = dfs.getFileInfo(target);
+    final HdfsFileStatus targetStatus = dfs.getFileInfo(target);
     if (targetStatus == null) { // not exists
     if (targetStatus == null) { // not exists
       out.print("<h3>File or directory : " + target + " does not exist</h3>");
       out.print("<h3>File or directory : " + target + " does not exist</h3>");
       JspHelper.printGotoForm(out, namenodeInfoPort, target);
       JspHelper.printGotoForm(out, namenodeInfoPort, target);
@@ -95,7 +95,7 @@ class DatanodeJspHelper {
         return;
         return;
       }
       }
       // directory
       // directory
-      FileStatus[] files = dfs.listPaths(target);
+      HdfsFileStatus[] files = dfs.listPaths(target);
       // generate a table and dump the info
       // generate a table and dump the info
       String[] headings = { "Name", "Type", "Size", "Replication",
       String[] headings = { "Name", "Type", "Size", "Replication",
           "Block Size", "Modification Time", "Permission", "Owner", "Group" };
           "Block Size", "Modification Time", "Permission", "Owner", "Group" };
@@ -120,8 +120,9 @@ class DatanodeJspHelper {
         JspHelper.addTableRow(out, headings, row++);
         JspHelper.addTableRow(out, headings, row++);
         String cols[] = new String[headings.length];
         String cols[] = new String[headings.length];
         for (int i = 0; i < files.length; i++) {
         for (int i = 0; i < files.length; i++) {
+          String localFileName = files[i].getLocalName();
           // Get the location of the first block of the file
           // Get the location of the first block of the file
-          if (files[i].getPath().toString().endsWith(".crc"))
+          if (localFileName.endsWith(".crc"))
             continue;
             continue;
           if (!files[i].isDir()) {
           if (!files[i].isDir()) {
             cols[1] = "file";
             cols[1] = "file";
@@ -135,10 +136,10 @@ class DatanodeJspHelper {
             cols[4] = "";
             cols[4] = "";
           }
           }
           String datanodeUrl = req.getRequestURL() + "?dir="
           String datanodeUrl = req.getRequestURL() + "?dir="
-              + URLEncoder.encode(files[i].getPath().toString(), "UTF-8")
+              + URLEncoder.encode(files[i].getFullName(target), "UTF-8")
               + "&namenodeInfoPort=" + namenodeInfoPort;
               + "&namenodeInfoPort=" + namenodeInfoPort;
           cols[0] = "<a href=\"" + datanodeUrl + "\">"
           cols[0] = "<a href=\"" + datanodeUrl + "\">"
-              + files[i].getPath().getName() + "</a>";
+              + localFileName + "</a>";
           cols[5] = FsShell.dateForm.format(new Date((files[i]
           cols[5] = FsShell.dateForm.format(new Date((files[i]
               .getModificationTime())));
               .getModificationTime())));
           cols[6] = files[i].getPermission().toString();
           cols[6] = files[i].getPermission().toString();

+ 12 - 12
src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -28,7 +28,6 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Options;
@@ -39,6 +38,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
@@ -1018,7 +1018,7 @@ class FSDirectory implements Closeable {
    * This function is admittedly very inefficient right now.  We'll
    * This function is admittedly very inefficient right now.  We'll
    * make it better later.
    * make it better later.
    */
    */
-  FileStatus[] getListing(String src) {
+  HdfsFileStatus[] getListing(String src) {
     String srcs = normalizePath(src);
     String srcs = normalizePath(src);
 
 
     synchronized (rootDir) {
     synchronized (rootDir) {
@@ -1026,15 +1026,14 @@ class FSDirectory implements Closeable {
       if (targetNode == null)
       if (targetNode == null)
         return null;
         return null;
       if (!targetNode.isDirectory()) {
       if (!targetNode.isDirectory()) {
-        return new FileStatus[]{createFileStatus(srcs, targetNode)};
+        return new HdfsFileStatus[]{createFileStatus(
+            HdfsFileStatus.EMPTY_NAME, targetNode)};
       }
       }
       List<INode> contents = ((INodeDirectory)targetNode).getChildren();
       List<INode> contents = ((INodeDirectory)targetNode).getChildren();
-      FileStatus listing[] = new FileStatus[contents.size()];
-      if(! srcs.endsWith(Path.SEPARATOR))
-        srcs += Path.SEPARATOR;
+      HdfsFileStatus listing[] = new HdfsFileStatus[contents.size()];
       int i = 0;
       int i = 0;
       for (INode cur : contents) {
       for (INode cur : contents) {
-        listing[i] = createFileStatus(srcs+cur.getLocalName(), cur);
+        listing[i] = createFileStatus(cur.name, cur);
         i++;
         i++;
       }
       }
       return listing;
       return listing;
@@ -1046,7 +1045,7 @@ class FSDirectory implements Closeable {
    * @return object containing information regarding the file
    * @return object containing information regarding the file
    *         or null if file not found
    *         or null if file not found
    */
    */
-  FileStatus getFileInfo(String src) {
+  HdfsFileStatus getFileInfo(String src) {
     String srcs = normalizePath(src);
     String srcs = normalizePath(src);
     synchronized (rootDir) {
     synchronized (rootDir) {
       INode targetNode = rootDir.getNode(srcs);
       INode targetNode = rootDir.getNode(srcs);
@@ -1054,7 +1053,7 @@ class FSDirectory implements Closeable {
         return null;
         return null;
       }
       }
       else {
       else {
-        return createFileStatus(srcs, targetNode);
+        return createFileStatus(HdfsFileStatus.EMPTY_NAME, targetNode);
       }
       }
     }
     }
   }
   }
@@ -1708,9 +1707,10 @@ class FSDirectory implements Closeable {
   /**
   /**
    * Create FileStatus by file INode 
    * Create FileStatus by file INode 
    */
    */
-   private static FileStatus createFileStatus(String path, INode node) {
+   private static HdfsFileStatus createFileStatus(byte[] path, INode node) {
     // length is zero for directories
     // length is zero for directories
-    return new FileStatus(node.isDirectory() ? 0 : node.computeContentSummary().getLength(), 
+    return new HdfsFileStatus(
+        node instanceof INodeFile ? ((INodeFile)node).computeFileSize(true) : 0, 
         node.isDirectory(), 
         node.isDirectory(), 
         node.isDirectory() ? 0 : ((INodeFile)node).getReplication(), 
         node.isDirectory() ? 0 : ((INodeFile)node).getReplication(), 
         node.isDirectory() ? 0 : ((INodeFile)node).getPreferredBlockSize(),
         node.isDirectory() ? 0 : ((INodeFile)node).getPreferredBlockSize(),
@@ -1719,6 +1719,6 @@ class FSDirectory implements Closeable {
         node.getFsPermission(),
         node.getFsPermission(),
         node.getUserName(),
         node.getUserName(),
         node.getGroupName(),
         node.getGroupName(),
-        new Path(path));
+        path);
   }
   }
 }
 }

+ 3 - 3
src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

@@ -26,7 +26,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Iterator;
 
 
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -35,6 +34,7 @@ import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
@@ -577,7 +577,7 @@ public class FSEditLog {
           String s = FSImage.readString(in);
           String s = FSImage.readString(in);
           String d = FSImage.readString(in);
           String d = FSImage.readString(in);
           timestamp = readLong(in);
           timestamp = readLong(in);
-          FileStatus dinfo = fsDir.getFileInfo(d);
+          HdfsFileStatus dinfo = fsDir.getFileInfo(d);
           fsDir.unprotectedRenameTo(s, d, timestamp);
           fsDir.unprotectedRenameTo(s, d, timestamp);
           fsNamesys.changeLease(s, d, dinfo);
           fsNamesys.changeLease(s, d, dinfo);
           break;
           break;
@@ -714,7 +714,7 @@ public class FSEditLog {
           String d = FSImage.readString(in);
           String d = FSImage.readString(in);
           timestamp = readLong(in);
           timestamp = readLong(in);
           Rename[] options = readRenameOptions(in);
           Rename[] options = readRenameOptions(in);
-          FileStatus dinfo = fsDir.getFileInfo(d);
+          HdfsFileStatus dinfo = fsDir.getFileInfo(d);
           fsDir.unprotectedRenameTo(s, d, timestamp, options);
           fsDir.unprotectedRenameTo(s, d, timestamp, options);
           fsNamesys.changeLease(s, d, dinfo);
           fsNamesys.changeLease(s, d, dinfo);
           break;
           break;

+ 2 - 1
src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -149,7 +150,7 @@ public class FSImage extends Storage {
    * Used for saving the image to disk
    * Used for saving the image to disk
    */
    */
   static private final FsPermission FILE_PERM = new FsPermission((short)0);
   static private final FsPermission FILE_PERM = new FsPermission((short)0);
-  static private final byte[] PATH_SEPARATOR = INode.string2Bytes(Path.SEPARATOR);
+  static private final byte[] PATH_SEPARATOR = DFSUtil.string2Bytes(Path.SEPARATOR);
 
 
   /**
   /**
    */
    */

+ 15 - 17
src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -62,7 +62,6 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Options;
@@ -87,7 +86,6 @@ import java.util.Map.Entry;
 import javax.management.NotCompliantMBeanException;
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
 import javax.management.StandardMBean;
-import javax.security.auth.login.LoginException;
 
 
 /***************************************************
 /***************************************************
  * FSNamesystem does the actual bookkeeping work for the
  * FSNamesystem does the actual bookkeeping work for the
@@ -120,7 +118,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
 
 
   private static final void logAuditEvent(UserGroupInformation ugi,
   private static final void logAuditEvent(UserGroupInformation ugi,
       InetAddress addr, String cmd, String src, String dst,
       InetAddress addr, String cmd, String src, String dst,
-      FileStatus stat) {
+      HdfsFileStatus stat) {
     final Formatter fmt = auditFormatter.get();
     final Formatter fmt = auditFormatter.get();
     ((StringBuilder)fmt.out()).setLength(0);
     ((StringBuilder)fmt.out()).setLength(0);
     auditLog.info(fmt.format(AUDIT_FORMAT, ugi, addr, cmd, src, dst,
     auditLog.info(fmt.format(AUDIT_FORMAT, ugi, addr, cmd, src, dst,
@@ -641,7 +639,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     dir.setPermission(src, permission);
     dir.setPermission(src, permission);
     getEditLog().logSync();
     getEditLog().logSync();
     if (auditLog.isInfoEnabled()) {
     if (auditLog.isInfoEnabled()) {
-      final FileStatus stat = dir.getFileInfo(src);
+      final HdfsFileStatus stat = dir.getFileInfo(src);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     Server.getRemoteIp(),
                     "setPermission", src, null, stat);
                     "setPermission", src, null, stat);
@@ -669,7 +667,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     dir.setOwner(src, username, group);
     dir.setOwner(src, username, group);
     getEditLog().logSync();
     getEditLog().logSync();
     if (auditLog.isInfoEnabled()) {
     if (auditLog.isInfoEnabled()) {
-      final FileStatus stat = dir.getFileInfo(src);
+      final HdfsFileStatus stat = dir.getFileInfo(src);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     Server.getRemoteIp(),
                     "setOwner", src, null, stat);
                     "setOwner", src, null, stat);
@@ -728,7 +726,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
                                                        ) throws IOException {
                                                        ) throws IOException {
     INodeFile inode = dir.getFileINode(src);
     INodeFile inode = dir.getFileINode(src);
     if (inode == null)
     if (inode == null)
-      throw new FileNotFoundException();
+      throw new FileNotFoundException(src);
     if (doAccessTime && isAccessTimeSupported()) {
     if (doAccessTime && isAccessTimeSupported()) {
       dir.setTimes(src, inode, -1, now(), false);
       dir.setTimes(src, inode, -1, now(), false);
     }
     }
@@ -906,7 +904,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
    
    
     
     
     if (auditLog.isInfoEnabled()) {
     if (auditLog.isInfoEnabled()) {
-      final FileStatus stat = dir.getFileInfo(target);
+      final HdfsFileStatus stat = dir.getFileInfo(target);
       logAuditEvent(UserGroupInformation.getLoginUser(),
       logAuditEvent(UserGroupInformation.getLoginUser(),
                     Server.getRemoteIp(),
                     Server.getRemoteIp(),
                     "concat", Arrays.toString(srcs), target, stat);
                     "concat", Arrays.toString(srcs), target, stat);
@@ -933,7 +931,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     if (inode != null) {
     if (inode != null) {
       dir.setTimes(src, inode, mtime, atime, true);
       dir.setTimes(src, inode, mtime, atime, true);
       if (auditLog.isInfoEnabled()) {
       if (auditLog.isInfoEnabled()) {
-        final FileStatus stat = dir.getFileInfo(src);
+        final HdfsFileStatus stat = dir.getFileInfo(src);
         logAuditEvent(UserGroupInformation.getCurrentUser(),
         logAuditEvent(UserGroupInformation.getCurrentUser(),
                       Server.getRemoteIp(),
                       Server.getRemoteIp(),
                       "setTimes", src, null, stat);
                       "setTimes", src, null, stat);
@@ -1046,7 +1044,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
         createParent, replication, blockSize);
         createParent, replication, blockSize);
     getEditLog().logSync();
     getEditLog().logSync();
     if (auditLog.isInfoEnabled()) {
     if (auditLog.isInfoEnabled()) {
-      final FileStatus stat = dir.getFileInfo(src);
+      final HdfsFileStatus stat = dir.getFileInfo(src);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     Server.getRemoteIp(),
                     "create", src, null, stat);
                     "create", src, null, stat);
@@ -1601,7 +1599,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     boolean status = renameToInternal(src, dst);
     boolean status = renameToInternal(src, dst);
     getEditLog().logSync();
     getEditLog().logSync();
     if (status && auditLog.isInfoEnabled()) {
     if (status && auditLog.isInfoEnabled()) {
-      final FileStatus stat = dir.getFileInfo(dst);
+      final HdfsFileStatus stat = dir.getFileInfo(dst);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     Server.getRemoteIp(),
                     "rename", src, dst, stat);
                     "rename", src, dst, stat);
@@ -1629,7 +1627,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
       checkAncestorAccess(actualdst, FsAction.WRITE);
       checkAncestorAccess(actualdst, FsAction.WRITE);
     }
     }
 
 
-    FileStatus dinfo = dir.getFileInfo(dst);
+    HdfsFileStatus dinfo = dir.getFileInfo(dst);
     if (dir.renameTo(src, dst)) {
     if (dir.renameTo(src, dst)) {
       changeLease(src, dst, dinfo);     // update lease with new filename
       changeLease(src, dst, dinfo);     // update lease with new filename
       return true;
       return true;
@@ -1648,7 +1646,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
       for (Rename option : options) {
       for (Rename option : options) {
         cmd.append(option.value()).append(" ");
         cmd.append(option.value()).append(" ");
       }
       }
-      final FileStatus stat = dir.getFileInfo(dst);
+      final HdfsFileStatus stat = dir.getFileInfo(dst);
       logAuditEvent(UserGroupInformation.getCurrentUser(), Server.getRemoteIp(),
       logAuditEvent(UserGroupInformation.getCurrentUser(), Server.getRemoteIp(),
                     cmd.toString(), src, dst, stat);
                     cmd.toString(), src, dst, stat);
     }
     }
@@ -1671,7 +1669,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
       checkAncestorAccess(dst, FsAction.WRITE);
       checkAncestorAccess(dst, FsAction.WRITE);
     }
     }
 
 
-    FileStatus dinfo = dir.getFileInfo(dst);
+    HdfsFileStatus dinfo = dir.getFileInfo(dst);
     dir.renameTo(src, dst, options);
     dir.renameTo(src, dst, options);
     changeLease(src, dst, dinfo); // update lease with new filename
     changeLease(src, dst, dinfo); // update lease with new filename
   }
   }
@@ -1770,7 +1768,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
    * @return object containing information regarding the file
    * @return object containing information regarding the file
    *         or null if file not found
    *         or null if file not found
    */
    */
-  FileStatus getFileInfo(String src) throws IOException {
+  HdfsFileStatus getFileInfo(String src) throws IOException {
     if (!DFSUtil.isValidName(src)) {
     if (!DFSUtil.isValidName(src)) {
       throw new IOException("Invalid file name: " + src);
       throw new IOException("Invalid file name: " + src);
     }
     }
@@ -1788,7 +1786,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
     boolean status = mkdirsInternal(src, permissions, createParent);
     boolean status = mkdirsInternal(src, permissions, createParent);
     getEditLog().logSync();
     getEditLog().logSync();
     if (status && auditLog.isInfoEnabled()) {
     if (status && auditLog.isInfoEnabled()) {
-      final FileStatus stat = dir.getFileInfo(src);
+      final HdfsFileStatus stat = dir.getFileInfo(src);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     Server.getRemoteIp(),
                     "mkdirs", src, null, stat);
                     "mkdirs", src, null, stat);
@@ -2139,7 +2137,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
    * Get a listing of all files at 'src'.  The Object[] array
    * Get a listing of all files at 'src'.  The Object[] array
    * exists so we can return file attributes (soon to be implemented)
    * exists so we can return file attributes (soon to be implemented)
    */
    */
-  public FileStatus[] getListing(String src) throws IOException {
+  public HdfsFileStatus[] getListing(String src) throws IOException {
     if (isPermissionEnabled) {
     if (isPermissionEnabled) {
       if (dir.isDir(src)) {
       if (dir.isDir(src)) {
         checkPathAccess(src, FsAction.READ_EXECUTE);
         checkPathAccess(src, FsAction.READ_EXECUTE);
@@ -4186,7 +4184,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, FSClusterSt
   // rename was successful. If any part of the renamed subtree had
   // rename was successful. If any part of the renamed subtree had
   // files that were being written to, update with new filename.
   // files that were being written to, update with new filename.
   //
   //
-  void changeLease(String src, String dst, FileStatus dinfo) 
+  void changeLease(String src, String dst, HdfsFileStatus dinfo) 
                    throws IOException {
                    throws IOException {
     String overwrite;
     String overwrite;
     String replaceBy;
     String replaceBy;

+ 9 - 8
src/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java

@@ -25,10 +25,11 @@ import java.security.PrivilegedExceptionAction;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.servlet.http.HttpServletResponse;
 
 
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -41,11 +42,11 @@ public class FileDataServlet extends DfsServlet {
   private static final long serialVersionUID = 1L;
   private static final long serialVersionUID = 1L;
 
 
   /** Create a redirection URI */
   /** Create a redirection URI */
-  protected URI createUri(FileStatus i, UserGroupInformation ugi,
+  protected URI createUri(String parent, HdfsFileStatus i, UserGroupInformation ugi,
       ClientProtocol nnproxy, HttpServletRequest request)
       ClientProtocol nnproxy, HttpServletRequest request)
       throws IOException, URISyntaxException {
       throws IOException, URISyntaxException {
     String scheme = request.getScheme();
     String scheme = request.getScheme();
-    final DatanodeID host = pickSrcDatanode(i, nnproxy);
+    final DatanodeID host = pickSrcDatanode(parent, i, nnproxy);
     final String hostname;
     final String hostname;
     if (host instanceof DatanodeInfo) {
     if (host instanceof DatanodeInfo) {
       hostname = ((DatanodeInfo)host).getHostName();
       hostname = ((DatanodeInfo)host).getHostName();
@@ -56,7 +57,7 @@ public class FileDataServlet extends DfsServlet {
         "https".equals(scheme)
         "https".equals(scheme)
           ? (Integer)getServletContext().getAttribute("datanode.https.port")
           ? (Integer)getServletContext().getAttribute("datanode.https.port")
           : host.getInfoPort(),
           : host.getInfoPort(),
-            "/streamFile", "filename=" + i.getPath() + 
+            "/streamFile", "filename=" + i.getFullName(parent) + 
             "&ugi=" + ugi.getShortUserName(), null);
             "&ugi=" + ugi.getShortUserName(), null);
   }
   }
 
 
@@ -64,10 +65,10 @@ public class FileDataServlet extends DfsServlet {
    * Currently, this looks at no more than the first five blocks of a file,
    * Currently, this looks at no more than the first five blocks of a file,
    * selecting a datanode randomly from the most represented.
    * selecting a datanode randomly from the most represented.
    */
    */
-  private DatanodeID pickSrcDatanode(FileStatus i,
+  private DatanodeID pickSrcDatanode(String parent, HdfsFileStatus i,
       ClientProtocol nnproxy) throws IOException {
       ClientProtocol nnproxy) throws IOException {
     final LocatedBlocks blks = nnproxy.getBlockLocations(
     final LocatedBlocks blks = nnproxy.getBlockLocations(
-        i.getPath().toUri().getPath(), 0, 1);
+        i.getFullPath(new Path(parent)).toUri().getPath(), 0, 1);
     if (i.getLen() == 0 || blks.getLocatedBlocks().size() <= 0) {
     if (i.getLen() == 0 || blks.getLocatedBlocks().size() <= 0) {
       // pick a random datanode
       // pick a random datanode
       NameNode nn = (NameNode)getServletContext().getAttribute("name.node");
       NameNode nn = (NameNode)getServletContext().getAttribute("name.node");
@@ -98,9 +99,9 @@ public class FileDataServlet extends DfsServlet {
 
 
       final String path = request.getPathInfo() != null ? 
       final String path = request.getPathInfo() != null ? 
                                                     request.getPathInfo() : "/";
                                                     request.getPathInfo() : "/";
-      FileStatus info = nnproxy.getFileInfo(path);
+      HdfsFileStatus info = nnproxy.getFileInfo(path);
       if ((info != null) && !info.isDir()) {
       if ((info != null) && !info.isDir()) {
-        response.sendRedirect(createUri(info, ugi, nnproxy,
+        response.sendRedirect(createUri(path, info, ugi, nnproxy,
               request).toURL().toString());
               request).toURL().toString());
       } else if (info == null){
       } else if (info == null){
         response.sendError(400, "cat: File not found " + path);
         response.sendError(400, "cat: File not found " + path);

+ 11 - 28
src/java/org/apache/hadoop/hdfs/server/namenode/INode.java

@@ -17,13 +17,13 @@
  */
  */
 package org.apache.hadoop.hdfs.server.namenode;
 package org.apache.hadoop.hdfs.server.namenode;
 
 
-import java.io.UnsupportedEncodingException;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.List;
 import java.util.List;
 
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.fs.permission.*;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
 
 
 /**
 /**
@@ -32,6 +32,13 @@ import org.apache.hadoop.hdfs.protocol.Block;
  * directory inodes.
  * directory inodes.
  */
  */
 abstract class INode implements Comparable<byte[]>, FSInodeInfo {
 abstract class INode implements Comparable<byte[]>, FSInodeInfo {
+  /*
+   *  The inode name is in java UTF8 encoding; 
+   *  The name in HdfsFileStatus should keep the same encoding as this.
+   *  if this encoding is changed, implicitly getFileInfo and listStatus in
+   *  clientProtocol are changed; The decoding at the client
+   *  side should change accordingly.
+   */
   protected byte[] name;
   protected byte[] name;
   protected INodeDirectory parent;
   protected INodeDirectory parent;
   protected long modificationTime;
   protected long modificationTime;
@@ -219,7 +226,7 @@ abstract class INode implements Comparable<byte[]>, FSInodeInfo {
    * @return local file name
    * @return local file name
    */
    */
   String getLocalName() {
   String getLocalName() {
-    return bytes2String(name);
+    return DFSUtil.bytes2String(name);
   }
   }
 
 
   /**
   /**
@@ -234,7 +241,7 @@ abstract class INode implements Comparable<byte[]>, FSInodeInfo {
    * Set local file name
    * Set local file name
    */
    */
   void setLocalName(String name) {
   void setLocalName(String name) {
-    this.name = string2Bytes(name);
+    this.name = DFSUtil.string2Bytes(name);
   }
   }
 
 
   /**
   /**
@@ -328,7 +335,7 @@ abstract class INode implements Comparable<byte[]>, FSInodeInfo {
     }
     }
     byte[][] bytes = new byte[strings.length][];
     byte[][] bytes = new byte[strings.length][];
     for (int i = 0; i < strings.length; i++)
     for (int i = 0; i < strings.length; i++)
-      bytes[i] = string2Bytes(strings[i]);
+      bytes[i] = DFSUtil.string2Bytes(strings[i]);
     return bytes;
     return bytes;
   }
   }
 
 
@@ -397,28 +404,4 @@ abstract class INode implements Comparable<byte[]>, FSInodeInfo {
     }
     }
     return len1 - len2;
     return len1 - len2;
   }
   }
-
-  /**
-   * Converts a byte array to a string using UTF8 encoding.
-   */
-  static String bytes2String(byte[] bytes) {
-    try {
-      return new String(bytes, "UTF8");
-    } catch(UnsupportedEncodingException e) {
-      assert false : "UTF8 encoding is not supported ";
-    }
-    return null;
-  }
-
-  /**
-   * Converts a string to a byte array using UTF8 encoding.
-   */
-  static byte[] string2Bytes(String str) {
-    try {
-      return str.getBytes("UTF8");
-    } catch(UnsupportedEncodingException e) {
-      assert false : "UTF8 encoding is not supported ";
-    }
-    return null;
-  }
 }
 }

+ 3 - 2
src/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java

@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
 
 
 /**
 /**
@@ -95,7 +96,7 @@ class INodeDirectory extends INode {
   }
   }
   
   
   INode getChild(String name) {
   INode getChild(String name) {
-    return getChildINode(string2Bytes(name));
+    return getChildINode(DFSUtil.string2Bytes(name));
   }
   }
 
 
   private INode getChildINode(byte[] name) {
   private INode getChildINode(byte[] name) {
@@ -161,7 +162,7 @@ class INodeDirectory extends INode {
   int getExistingPathINodes(byte[][] components, INode[] existing) {
   int getExistingPathINodes(byte[][] components, INode[] existing) {
     assert compareBytes(this.name, components[0]) == 0 :
     assert compareBytes(this.name, components[0]) == 0 :
       "Incorrect name " + getLocalName() + " expected " + 
       "Incorrect name " + getLocalName() + " expected " + 
-      bytes2String(components[0]);
+      DFSUtil.bytes2String(components[0]);
 
 
     INode curNode = this;
     INode curNode = this;
     int count = 0;
     int count = 0;

+ 13 - 10
src/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java

@@ -18,8 +18,10 @@
 package org.apache.hadoop.hdfs.server.namenode;
 package org.apache.hadoop.hdfs.server.namenode;
 
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HftpFileSystem;
 import org.apache.hadoop.hdfs.HftpFileSystem;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.util.VersionInfo;
 
 
@@ -59,10 +61,10 @@ public class ListPathsServlet extends DfsServlet {
    * Node information includes path, modification, permission, owner and group.
    * Node information includes path, modification, permission, owner and group.
    * For files, it also includes size, replication and block-size. 
    * For files, it also includes size, replication and block-size. 
    */
    */
-  static void writeInfo(FileStatus i, XMLOutputter doc) throws IOException {
+  static void writeInfo(String parent, HdfsFileStatus i, XMLOutputter doc) throws IOException {
     final SimpleDateFormat ldf = df.get();
     final SimpleDateFormat ldf = df.get();
     doc.startTag(i.isDir() ? "directory" : "file");
     doc.startTag(i.isDir() ? "directory" : "file");
-    doc.attribute("path", i.getPath().toUri().getPath());
+    doc.attribute("path", i.getFullPath(new Path(parent)).toUri().getPath());
     doc.attribute("modified", ldf.format(new Date(i.getModificationTime())));
     doc.attribute("modified", ldf.format(new Date(i.getModificationTime())));
     doc.attribute("accesstime", ldf.format(new Date(i.getAccessTime())));
     doc.attribute("accesstime", ldf.format(new Date(i.getAccessTime())));
     if (!i.isDir()) {
     if (!i.isDir()) {
@@ -148,9 +150,9 @@ public class ListPathsServlet extends DfsServlet {
         doc.attribute(m.getKey(), m.getValue());
         doc.attribute(m.getKey(), m.getValue());
       }
       }
 
 
-      FileStatus base = nnproxy.getFileInfo(path);
+      HdfsFileStatus base = nnproxy.getFileInfo(path);
       if ((base != null) && base.isDir()) {
       if ((base != null) && base.isDir()) {
-        writeInfo(base, doc);
+        writeInfo(path, base, doc);
       }
       }
 
 
       Stack<String> pathstack = new Stack<String>();
       Stack<String> pathstack = new Stack<String>();
@@ -158,20 +160,21 @@ public class ListPathsServlet extends DfsServlet {
       while (!pathstack.empty()) {
       while (!pathstack.empty()) {
         String p = pathstack.pop();
         String p = pathstack.pop();
         try {
         try {
-          FileStatus[] listing = nnproxy.getListing(p);
+          HdfsFileStatus[] listing = nnproxy.getListing(p);
           if (listing == null) {
           if (listing == null) {
             LOG.warn("ListPathsServlet - Path " + p + " does not exist");
             LOG.warn("ListPathsServlet - Path " + p + " does not exist");
             continue;
             continue;
           }
           }
-          for (FileStatus i : listing) {
-            if (exclude.matcher(i.getPath().getName()).matches()
-                || !filter.matcher(i.getPath().getName()).matches()) {
+          for (HdfsFileStatus i : listing) {
+            String localName = i.getLocalName();
+            if (exclude.matcher(localName).matches()
+                || !filter.matcher(localName).matches()) {
               continue;
               continue;
             }
             }
             if (recur && i.isDir()) {
             if (recur && i.isDir()) {
-              pathstack.push(i.getPath().toUri().getPath());
+              pathstack.push(new Path(p, localName).toUri().getPath());
             }
             }
-            writeInfo(i, doc);
+            writeInfo(p, i, doc);
           }
           }
         }
         }
         catch(RemoteException re) {re.writeXml(p, doc);}
         catch(RemoteException re) {re.writeXml(p, doc);}

+ 4 - 4
src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -31,7 +31,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
@@ -46,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
@@ -828,8 +828,8 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
 
 
   /**
   /**
    */
    */
-  public FileStatus[] getListing(String src) throws IOException {
-    FileStatus[] files = namesystem.getListing(src);
+  public HdfsFileStatus[] getListing(String src) throws IOException {
+    HdfsFileStatus[] files = namesystem.getListing(src);
     if (files != null) {
     if (files != null) {
       myMetrics.numGetListingOps.inc();
       myMetrics.numGetListingOps.inc();
       myMetrics.numFilesInGetListingOps.inc(files.length);
       myMetrics.numFilesInGetListingOps.inc(files.length);
@@ -844,7 +844,7 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
    * @return object containing information regarding the file
    * @return object containing information regarding the file
    *         or null if file not found
    *         or null if file not found
    */
    */
-  public FileStatus getFileInfo(String src)  throws IOException {
+  public HdfsFileStatus getFileInfo(String src)  throws IOException {
     myMetrics.numFileInfoOps.inc();
     myMetrics.numFileInfoOps.inc();
     return namesystem.getFileInfo(src);
     return namesystem.getFileInfo(src);
   }
   }

+ 15 - 14
src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java

@@ -32,12 +32,12 @@ import java.util.TreeSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -141,10 +141,10 @@ public class NamenodeFsck {
     try {
     try {
       Result res = new Result(conf);
       Result res = new Result(conf);
 
 
-      final FileStatus[] files = namenode.getListing(path);
+      final HdfsFileStatus[] files = namenode.getListing(path);
       if (files != null) {
       if (files != null) {
         for (int i = 0; i < files.length; i++) {
         for (int i = 0; i < files.length; i++) {
-          check(files[i], res);
+          check(path, files[i], res);
         }
         }
         out.println(res);
         out.println(res);
         out.println(" Number of data-nodes:\t\t" + totalDatanodes);
         out.println(" Number of data-nodes:\t\t" + totalDatanodes);
@@ -171,12 +171,12 @@ public class NamenodeFsck {
     }
     }
   }
   }
   
   
-  private void check(FileStatus file, Result res) throws IOException {
-    String path = file.getPath().toString();
+  private void check(String parent, HdfsFileStatus file, Result res) throws IOException {
+    String path = file.getFullName(parent);
     boolean isOpen = false;
     boolean isOpen = false;
 
 
     if (file.isDir()) {
     if (file.isDir()) {
-      final FileStatus[] files = namenode.getListing(path);
+      final HdfsFileStatus[] files = namenode.getListing(path);
       if (files == null) {
       if (files == null) {
         return;
         return;
       }
       }
@@ -185,7 +185,7 @@ public class NamenodeFsck {
       }
       }
       res.totalDirs++;
       res.totalDirs++;
       for (int i = 0; i < files.length; i++) {
       for (int i = 0; i < files.length; i++) {
-        check(files[i], res);
+        check(path, files[i], res);
       }
       }
       return;
       return;
     }
     }
@@ -304,7 +304,7 @@ public class NamenodeFsck {
         break;
         break;
       case FIXING_MOVE:
       case FIXING_MOVE:
         if (!isOpen)
         if (!isOpen)
-          lostFoundMove(file, blocks);
+          lostFoundMove(parent, file, blocks);
         break;
         break;
       case FIXING_DELETE:
       case FIXING_DELETE:
         if (!isOpen)
         if (!isOpen)
@@ -323,7 +323,7 @@ public class NamenodeFsck {
     }
     }
   }
   }
   
   
-  private void lostFoundMove(FileStatus file, LocatedBlocks blocks)
+  private void lostFoundMove(String parent, HdfsFileStatus file, LocatedBlocks blocks)
     throws IOException {
     throws IOException {
     final DFSClient dfs = new DFSClient(NameNode.getAddress(conf), conf);
     final DFSClient dfs = new DFSClient(NameNode.getAddress(conf), conf);
     try {
     try {
@@ -333,8 +333,9 @@ public class NamenodeFsck {
     if (!lfInitedOk) {
     if (!lfInitedOk) {
       return;
       return;
     }
     }
-    String target = lostFound + file.getPath();
-    String errmsg = "Failed to move " + file.getPath() + " to /lost+found";
+    String fullName = file.getFullName(parent);
+    String target = lostFound + fullName;
+    String errmsg = "Failed to move " + fullName + " to /lost+found";
     try {
     try {
       if (!namenode.mkdirs(target, file.getPermission(), true)) {
       if (!namenode.mkdirs(target, file.getPermission(), true)) {
         LOG.warn(errmsg);
         LOG.warn(errmsg);
@@ -378,8 +379,8 @@ public class NamenodeFsck {
         }
         }
       }
       }
       if (fos != null) fos.close();
       if (fos != null) fos.close();
-      LOG.warn("\n - moved corrupted file " + file.getPath() + " to /lost+found");
-      dfs.delete(file.getPath().toString(), true);
+      LOG.warn("\n - moved corrupted file " + fullName + " to /lost+found");
+      dfs.delete(fullName, true);
     }  catch (Exception e) {
     }  catch (Exception e) {
       e.printStackTrace();
       e.printStackTrace();
       LOG.warn(errmsg + ": " + e.getMessage());
       LOG.warn(errmsg + ": " + e.getMessage());
@@ -500,7 +501,7 @@ public class NamenodeFsck {
     try {
     try {
       String lfName = "/lost+found";
       String lfName = "/lost+found";
       
       
-      final FileStatus lfStatus = dfs.getFileInfo(lfName);
+      final HdfsFileStatus lfStatus = dfs.getFileInfo(lfName);
       if (lfStatus == null) { // not exists
       if (lfStatus == null) { // not exists
         lfInitedOk = dfs.mkdirs(lfName, null, true);
         lfInitedOk = dfs.mkdirs(lfName, null, true);
         lostFound = lfName;
         lostFound = lfName;

+ 2 - 2
src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java

@@ -224,7 +224,7 @@ public class TestDFSClientRetries extends TestCase {
 
 
     public boolean mkdirs(String src, FsPermission masked, boolean createParent) throws IOException { return false; }
     public boolean mkdirs(String src, FsPermission masked, boolean createParent) throws IOException { return false; }
 
 
-    public FileStatus[] getListing(String src) throws IOException { return null; }
+    public HdfsFileStatus[] getListing(String src) throws IOException { return null; }
 
 
     public void renewLease(String clientName) throws IOException {}
     public void renewLease(String clientName) throws IOException {}
 
 
@@ -248,7 +248,7 @@ public class TestDFSClientRetries extends TestCase {
 
 
     public void metaSave(String filename) throws IOException {}
     public void metaSave(String filename) throws IOException {}
 
 
-    public FileStatus getFileInfo(String src) throws IOException { return null; }
+    public HdfsFileStatus getFileInfo(String src) throws IOException { return null; }
 
 
     public ContentSummary getContentSummary(String path) throws IOException { return null; }
     public ContentSummary getContentSummary(String path) throws IOException { return null; }
 
 

+ 4 - 4
src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java

@@ -28,9 +28,9 @@ import java.util.zip.CRC32;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
@@ -130,11 +130,11 @@ public class TestDFSUpgradeFromImage extends TestCase {
   private void verifyDir(DFSClient client, String dir) 
   private void verifyDir(DFSClient client, String dir) 
                                            throws IOException {
                                            throws IOException {
     
     
-    FileStatus[] fileArr = client.listPaths(dir);
+    HdfsFileStatus[] fileArr = client.listPaths(dir);
     TreeMap<String, Boolean> fileMap = new TreeMap<String, Boolean>();
     TreeMap<String, Boolean> fileMap = new TreeMap<String, Boolean>();
     
     
-    for(FileStatus file : fileArr) {
-      String path = file.getPath().toString();
+    for(HdfsFileStatus file : fileArr) {
+      String path = file.getFullName(dir);
       fileMap.put(path, Boolean.valueOf(file.isDir()));
       fileMap.put(path, Boolean.valueOf(file.isDir()));
     }
     }
     
     

+ 61 - 16
src/test/hdfs/org/apache/hadoop/hdfs/TestFileStatus.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RemoteException;
@@ -88,7 +89,7 @@ public class TestFileStatus extends TestCase {
                  fs.getFileStatus(path).isDir() == true);
                  fs.getFileStatus(path).isDir() == true);
       
       
       // make sure getFileInfo returns null for files which do not exist
       // make sure getFileInfo returns null for files which do not exist
-      FileStatus fileInfo = dfsClient.getFileInfo("/noSuchFile");
+      HdfsFileStatus fileInfo = dfsClient.getFileInfo("/noSuchFile");
       assertTrue(fileInfo == null);
       assertTrue(fileInfo == null);
 
 
       // make sure getFileInfo throws the appropriate exception for non-relative
       // make sure getFileInfo throws the appropriate exception for non-relative
@@ -107,43 +108,74 @@ public class TestFileStatus extends TestCase {
       System.out.println("Created file filestatus.dat with one "
       System.out.println("Created file filestatus.dat with one "
                          + " replicas.");
                          + " replicas.");
       checkFile(fs, file1, 1);
       checkFile(fs, file1, 1);
-      assertTrue(file1 + " should be a file", 
-                  fs.getFileStatus(file1).isDir() == false);
-      assertTrue(fs.getFileStatus(file1).getBlockSize() == blockSize);
-      assertTrue(fs.getFileStatus(file1).getReplication() == 1);
-      assertTrue(fs.getFileStatus(file1).getLen() == fileSize);
       System.out.println("Path : \"" + file1 + "\"");
       System.out.println("Path : \"" + file1 + "\"");
+      
+      // test getFileStatus on a file
+      FileStatus status = fs.getFileStatus(file1);
+      assertTrue(file1 + " should be a file", 
+                  status.isDir() == false);
+      assertTrue(status.getBlockSize() == blockSize);
+      assertTrue(status.getReplication() == 1);
+      assertTrue(status.getLen() == fileSize);
+      assertEquals(file1.makeQualified(fs.getUri(), 
+          fs.getWorkingDirectory()).toString(), 
+          status.getPath().toString());
+      
+      // test listStatus on a file
+      FileStatus[] stats = fs.listStatus(file1);
+      assertEquals(1, stats.length);
+      status = stats[0];
+      assertTrue(file1 + " should be a file", 
+          status.isDir() == false);
+      assertTrue(status.getBlockSize() == blockSize);
+      assertTrue(status.getReplication() == 1);
+      assertTrue(status.getLen() == fileSize);
+      assertEquals(file1.makeQualified(fs.getUri(), 
+          fs.getWorkingDirectory()).toString(), 
+          status.getPath().toString());
 
 
       // create an empty directory
       // create an empty directory
       //
       //
       Path dir = new Path("/test/mkdirs");
       Path dir = new Path("/test/mkdirs");
       assertTrue(fs.mkdirs(dir));
       assertTrue(fs.mkdirs(dir));
       assertTrue(fs.exists(dir));
       assertTrue(fs.exists(dir));
-      assertTrue(dir + " should be a directory", 
-                 fs.getFileStatus(path).isDir() == true);
+      System.out.println("Dir : \"" + dir + "\"");
+      
+      // test getFileStatus on an empty directory
+      status = fs.getFileStatus(dir);
+      assertTrue(dir + " should be a directory", status.isDir());
+      assertTrue(dir + " should be zero size ", status.getLen() == 0);
+      assertEquals(dir.makeQualified(fs.getUri(), 
+          fs.getWorkingDirectory()).toString(), 
+          status.getPath().toString());
+      
+      // test listStatus on an empty directory
+      stats = fs.listStatus(dir);
+      assertEquals(dir + " should be empty", 0, stats.length);
       assertEquals(dir + " should be zero size ",
       assertEquals(dir + " should be zero size ",
           0, fs.getContentSummary(dir).getLength());
           0, fs.getContentSummary(dir).getLength());
       assertEquals(dir + " should be zero size using hftp",
       assertEquals(dir + " should be zero size using hftp",
           0, hftpfs.getContentSummary(dir).getLength());
           0, hftpfs.getContentSummary(dir).getLength());
-      assertTrue(dir + " should be zero size ",
-                 fs.getFileStatus(dir).getLen() == 0);
-      System.out.println("Dir : \"" + dir + "\"");
 
 
       // create another file that is smaller than a block.
       // create another file that is smaller than a block.
       //
       //
-      Path file2 = new Path("/test/mkdirs/filestatus2.dat");
+      Path file2 = new Path(dir, "filestatus2.dat");
       writeFile(fs, file2, 1, blockSize/4, blockSize);
       writeFile(fs, file2, 1, blockSize/4, blockSize);
       System.out.println("Created file filestatus2.dat with one "
       System.out.println("Created file filestatus2.dat with one "
                          + " replicas.");
                          + " replicas.");
       checkFile(fs, file2, 1);
       checkFile(fs, file2, 1);
       System.out.println("Path : \"" + file2 + "\"");
       System.out.println("Path : \"" + file2 + "\"");
-
+      
       // verify file attributes
       // verify file attributes
-      assertTrue(fs.getFileStatus(file2).getBlockSize() == blockSize);
-      assertTrue(fs.getFileStatus(file2).getReplication() == 1);
+      status = fs.getFileStatus(file2);
+      assertTrue(status.getBlockSize() == blockSize);
+      assertTrue(status.getReplication() == 1);
+      assertEquals(file2.makeQualified(
+          fs.getUri(), fs.getWorkingDirectory()).toString(), 
+          status.getPath().toString());
 
 
       // create another file in the same directory
       // create another file in the same directory
-      Path file3 = new Path("/test/mkdirs/filestatus3.dat");
+      Path file3 = new Path(dir, "filestatus3.dat");
       writeFile(fs, file3, 1, blockSize/4, blockSize);
       writeFile(fs, file3, 1, blockSize/4, blockSize);
       System.out.println("Created file filestatus3.dat with one "
       System.out.println("Created file filestatus3.dat with one "
                          + " replicas.");
                          + " replicas.");
@@ -156,6 +188,19 @@ public class TestFileStatus extends TestCase {
           expected, fs.getContentSummary(dir).getLength());
           expected, fs.getContentSummary(dir).getLength());
       assertEquals(dir + " size should be " + expected + " using hftp", 
       assertEquals(dir + " size should be " + expected + " using hftp", 
           expected, hftpfs.getContentSummary(dir).getLength());
           expected, hftpfs.getContentSummary(dir).getLength());
+      
+      // test listStatus on a non-empty directory
+      stats = fs.listStatus(dir);
+      assertEquals(dir + " should have two entries", 2, stats.length);
+      String qualifiedFile2 = file2.makeQualified(fs.getUri(), 
+          fs.getWorkingDirectory()).toString();
+      String qualifiedFile3 = file3.makeQualified(fs.getUri(), 
+          fs.getWorkingDirectory()).toString();
+      for(FileStatus stat:stats) {
+        String statusFullName = stat.getPath().toString();
+        assertTrue(qualifiedFile2.equals(statusFullName)
+          || qualifiedFile3.toString().equals(statusFullName));
+      }
     } finally {
     } finally {
       fs.close();
       fs.close();
       cluster.shutdown();
       cluster.shutdown();

+ 3 - 4
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java

@@ -32,12 +32,11 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -105,7 +104,7 @@ public class TestHDFSConcat {
   public void testConcat() throws IOException, InterruptedException {
   public void testConcat() throws IOException, InterruptedException {
     final int numFiles = 10;
     final int numFiles = 10;
     long fileLen = blockSize*3;
     long fileLen = blockSize*3;
-    FileStatus fStatus;
+    HdfsFileStatus fStatus;
     FSDataInputStream stm;
     FSDataInputStream stm;
     
     
     String trg = new String("/trg");
     String trg = new String("/trg");
@@ -252,7 +251,7 @@ public class TestHDFSConcat {
     Path filePath1 = new Path(name1);
     Path filePath1 = new Path(name1);
     DFSTestUtil.createFile(dfs, filePath1, trgFileLen, REPL_FACTOR, 1);
     DFSTestUtil.createFile(dfs, filePath1, trgFileLen, REPL_FACTOR, 1);
     
     
-    FileStatus fStatus = cluster.getNameNode().getFileInfo(name1);
+    HdfsFileStatus fStatus = cluster.getNameNode().getFileInfo(name1);
     long fileLen = fStatus.getLen();
     long fileLen = fStatus.getLen();
     assertEquals(fileLen, trgFileLen);
     assertEquals(fileLen, trgFileLen);