Przeglądaj źródła

commit 00cd9e2ce3e18cf8e062d299cd28958c1a87cb51
Author: Hairong Kuang <hairong@ucdev21.inktomisearch.com>
Date: Fri Feb 26 18:40:08 2010 +0000

HDFS:985 from http://issues.apache.org/jira/secure/attachment/12437088/iterativeLS_yahoo1.patc

+++ b/YAHOO-CHANGES.txt
+ HDFS-985. HDFS should issue multiple RPCs for listing a large directory.
+ (hairong)
+


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077229 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 lat temu
rodzic
commit
bf3b2a68e3

+ 12 - 3
src/hdfs/org/apache/hadoop/hdfs/DFSClient.java

@@ -55,7 +55,6 @@ import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 
 import javax.net.SocketFactory;
-import javax.security.auth.login.LoginException;
 
 /********************************************************
  * DFSClient can connect to a Hadoop Filesystem and 
@@ -624,11 +623,21 @@ public class DFSClient implements FSConstants, java.io.Closeable {
   }
 
   /**
+   * Get a partial listing of the indicated directory
+   * 
+   * Recommend to use HdfsFileStatus.EMPTY_NAME as startAfter 
+   * if the application wants to fetch a listing starting from 
+   * the first entry in the directory
+   * 
+   * @param src the directory name
+   * @param startAfter the name to start listing after
+   * @return a partial listing starting after startAfter 
    */
-  public HdfsFileStatus[] listPaths(String src) throws IOException {
+  public DirectoryListing listPaths(String src, byte[] startAfter)
+  throws IOException {
     checkOpen();
     try {
-      return namenode.getListing(src);
+      return namenode.getListing(src, startAfter);
     } catch(RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class);
     }

+ 2 - 0
src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -87,6 +87,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT = "ssl-server.xml";
   public static final String  DFS_NAMENODE_NAME_DIR_RESTORE_KEY = "dfs.namenode.name.dir.restore";
   public static final boolean DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT = false;
+  public static final String  DFS_LIST_LIMIT = "dfs.ls.limit";
+  public static final int     DFS_LIST_LIMIT_DEFAULT = 1000;
 
   //Delegation token related keys
   public static final String  DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY = "dfs.namenode.delegation.key.update-interval";

+ 55 - 6
src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 
 import java.io.*;
 import java.net.*;
+import java.util.ArrayList;
 
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.*;
@@ -29,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -253,14 +255,61 @@ public class DistributedFileSystem extends FileSystem {
         f.getFullPath(parent).makeQualified(this)); // fully-qualify path
   }
 
+  /**
+   * List all the entries of a directory
+   * 
+   * Note that this operation is not atomic for a large directory.
+   * The entries of a directory may be fetched from NameNode multiple times.
+   * It only guarantees that  each name occurs once if a directory 
+   * undergoes changes between the calls.
+   */
+  @Override
   public FileStatus[] listStatus(Path p) throws IOException {
-    HdfsFileStatus[] infos = dfs.listPaths(getPathName(p));
-    if (infos == null) return null;
-    FileStatus[] stats = new FileStatus[infos.length];
-    for (int i = 0; i < infos.length; i++) {
-      stats[i] = makeQualified(infos[i], p);
+    String src = getPathName(p);
+    
+    // fetch the first batch of entries in the directory
+    DirectoryListing thisListing = dfs.listPaths(
+        src, HdfsFileStatus.EMPTY_NAME);
+    
+    if (thisListing == null) { // the directory does not exist
+      return null;
     }
-    return stats;
+    
+    HdfsFileStatus[] partialListing = thisListing.getPartialListing();
+    if (!thisListing.hasMore()) { // got all entries of the directory
+      FileStatus[] stats = new FileStatus[partialListing.length];
+      for (int i = 0; i < partialListing.length; i++) {
+        stats[i] = makeQualified(partialListing[i], p);
+      }
+      return stats;
+    }
+    
+    // The directory size is too big that it needs to fetch more
+    // estimate the total number of entries in the directory
+    int totalNumEntries = 
+      partialListing.length + thisListing.getRemainingEntries();
+    ArrayList<FileStatus> listing = 
+      new ArrayList<FileStatus>(totalNumEntries);
+    // add the first batch of entries to the array list
+    for (HdfsFileStatus fileStatus : partialListing) {
+      listing.add(makeQualified(fileStatus, p));
+    }
+
+    // now fetch more entries
+    do {
+      thisListing = dfs.listPaths(src, thisListing.getLastName());
+      
+      if (thisListing == null) {
+        break; // the directory is deleted
+      }
+      
+      partialListing = thisListing.getPartialListing();
+      for (HdfsFileStatus fileStatus : partialListing) {
+        listing.add(makeQualified(fileStatus, p));
+      }
+    } while (thisListing.hasMore());
+
+    return listing.toArray(new FileStatus[listing.size()]);
   }
 
   public boolean mkdirs(Path f, FsPermission permission) throws IOException {

+ 9 - 5
src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -49,10 +49,9 @@ public interface ClientProtocol extends VersionedProtocol {
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 44: getFileInfo returns HDFSFileStatus;
-   *     getListing returns HDFSFileStatus[].
+   * 45: Replace full getListing with iterative getListinng
    */
-  public static final long versionID = 44L;
+  public static final long versionID = 45L;
   
   ///////////////////////////////////////
   // File contents
@@ -265,9 +264,14 @@ public interface ClientProtocol extends VersionedProtocol {
   public boolean mkdirs(String src, FsPermission masked) throws IOException;
 
   /**
-   * Get a listing of the indicated directory
+   * Get a partial listing of the indicated directory
+   * 
+   * @param src the directory name
+   * @param startAfter the name of the last entry received by the client
+   * @return a partial listing starting after startAfter 
    */
-  public HdfsFileStatus[] getListing(String src) throws IOException;
+  public DirectoryListing getListing(String src, byte[] startAfter)
+  throws IOException;
 
   ///////////////////////////////////////
   // System issues and management

+ 123 - 0
src/hdfs/org/apache/hadoop/hdfs/protocol/DirectoryListing.java

@@ -0,0 +1,123 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * This class defines a partial listing of a directory to support
+ * iterative directory listing.
+ */
+public class DirectoryListing implements Writable {
+  static {                                      // register a ctor
+    WritableFactories.setFactory
+      (DirectoryListing.class,
+       new WritableFactory() {
+         public Writable newInstance() { return new DirectoryListing(); }
+       });
+  }
+
+  private HdfsFileStatus[] partialListing;
+  private int remainingEntries;
+  
+  /**
+   * default constructor
+   */
+  public DirectoryListing() {
+  }
+  
+  /**
+   * constructor
+   * @param partialListing a partial listing of a directory
+   * @param remainingEntries number of entries that are left to be listed
+   */
+  public DirectoryListing(HdfsFileStatus[] partialListing, 
+      int remainingEntries) {
+    if (partialListing == null) {
+      throw new IllegalArgumentException("partial listing should not be null");
+    }
+    if (partialListing.length == 0 && remainingEntries != 0) {
+      throw new IllegalArgumentException("Partial listing is empty but " +
+          "the number of remaining entries is not zero");
+    }
+    this.partialListing = partialListing;
+    this.remainingEntries = remainingEntries;
+  }
+
+  /**
+   * Get the partial listing of file status
+   * @return the partial listing of file status
+   */
+  public HdfsFileStatus[] getPartialListing() {
+    return partialListing;
+  }
+  
+  /**
+   * Get the number of remaining entries that are left to be listed
+   * @return the number of remaining entries that are left to be listed
+   */
+  public int getRemainingEntries() {
+    return remainingEntries;
+  }
+  
+  /**
+   * Check if there are more entries that are left to be listed
+   * @return true if there are more entries that are left to be listed;
+   *         return false otherwise.
+   */
+  public boolean hasMore() {
+    return remainingEntries != 0;
+  }
+  
+  /**
+   * Get the last name in this list
+   * @return the last name in the list if it is not empty; otherwise return null
+   */
+  public byte[] getLastName() {
+    if (partialListing.length == 0) {
+      return null;
+    }
+    return partialListing[partialListing.length-1].getLocalNameInBytes();
+  }
+
+  // Writable interface
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int numEntries = in.readInt();
+    partialListing = new HdfsFileStatus[numEntries];
+    for (int i=0; i<numEntries; i++) {
+      partialListing[i] = new HdfsFileStatus();
+      partialListing[i].readFields(in);
+    }
+    remainingEntries = in.readInt();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(partialListing.length);
+    for (HdfsFileStatus fileStatus : partialListing) {
+      fileStatus.write(out);
+    }
+    out.writeInt(remainingEntries);
+  }
+}

+ 17 - 0
src/hdfs/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java

@@ -26,10 +26,19 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
 
 /** Interface that represents the over the wire information for a file.
  */
 public class HdfsFileStatus implements Writable {
+  static {                                      // register a ctor
+    WritableFactories.setFactory
+      (HdfsFileStatus.class,
+       new WritableFactory() {
+         public Writable newInstance() { return new HdfsFileStatus(); }
+       });
+  }
 
   private byte[] path;  // local name of the inode that's encoded in java UTF8
   private long length;
@@ -167,6 +176,14 @@ public class HdfsFileStatus implements Writable {
     return DFSUtil.bytes2String(path);
   }
   
+  /**
+   * Get the Java UTF8 representation of the local name
+   * @return the local name in java UTF8
+   */
+  final public byte[] getLocalNameInBytes() {
+    return path;
+  }
+  
   /**
    * Get the string representation of the full path name
    * @param parent the parent path

+ 26 - 13
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -27,9 +27,11 @@ import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
@@ -51,6 +53,7 @@ class FSDirectory implements FSConstants, Closeable {
   private boolean ready = false;
   // Metrics record
   private MetricsRecord directoryMetrics = null;
+  private final int lsLimit;  // max list limit
 
   /** Access an existing dfs name directory. */
   FSDirectory(FSNamesystem ns, Configuration conf) {
@@ -65,6 +68,10 @@ class FSDirectory implements FSConstants, Closeable {
         Integer.MAX_VALUE, -1);
     this.fsImage = fsImage;
     namesystem = ns;
+    int configuredLimit = conf.getInt(
+        DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
+    this.lsLimit = configuredLimit>0 ? 
+        configuredLimit : DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT;
     initialize(conf);
   }
     
@@ -701,30 +708,36 @@ class FSDirectory implements FSConstants, Closeable {
   }
 
   /**
-   * Get a listing of files given path 'src'
-   *
-   * This function is admittedly very inefficient right now.  We'll
-   * make it better later.
+   * Get a partial listing of the indicated directory
+   * 
+   * @param src the directory name
+   * @param startAfter the name to start listing after
+   * @return a partial listing starting after startAfter 
    */
-  HdfsFileStatus[] getListing(String src) {
+  DirectoryListing getListing(String src, byte[] startAfter) {
     String srcs = normalizePath(src);
 
     synchronized (rootDir) {
       INode targetNode = rootDir.getNode(srcs);
       if (targetNode == null)
         return null;
+      
       if (!targetNode.isDirectory()) {
-        return new HdfsFileStatus[]{createFileStatus(
-            HdfsFileStatus.EMPTY_NAME, targetNode)};
+        return new DirectoryListing(new HdfsFileStatus[]{createFileStatus(
+            HdfsFileStatus.EMPTY_NAME, targetNode)}, 0);
       }
-      List<INode> contents = ((INodeDirectory)targetNode).getChildren();
-      HdfsFileStatus listing[] = new HdfsFileStatus[contents.size()];
-      int i = 0;
-      for (INode cur : contents) {
+      INodeDirectory dirInode = (INodeDirectory)targetNode; 
+      List<INode> contents = dirInode.getChildren();
+      int startChild = dirInode.nextChild(startAfter);
+      int totalNumChildren = contents.size();
+      int numOfListing = Math.min(totalNumChildren-startChild, this.lsLimit);
+      HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
+      for (int i=0; i<numOfListing; i++) {
+        INode cur = contents.get(startChild+i);
         listing[i] = createFileStatus(cur.name, cur);
-        i++;
       }
-      return listing;
+      return new DirectoryListing(
+          listing, totalNumChildren-startChild-numOfListing);
     }
   }
 

+ 9 - 6
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -56,9 +56,9 @@ import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.Server;
 
 import java.io.BufferedWriter;
 import java.io.File;
@@ -75,7 +75,6 @@ import java.util.Map.Entry;
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
-import javax.security.auth.login.LoginException;
 
 /***************************************************
  * FSNamesystem does the actual bookkeeping work for the
@@ -2056,10 +2055,14 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
   }
 
   /**
-   * Get a listing of all files at 'src'.  The Object[] array
-   * exists so we can return file attributes (soon to be implemented)
+   * Get a partial listing of the indicated directory
+   * 
+   * @param src the directory name
+   * @param startAfter the name to start after
+   * @return a partial listing starting after startAfter 
    */
-  public HdfsFileStatus[] getListing(String src) throws IOException {
+  public DirectoryListing getListing(String src, byte[] startAfter)
+  throws IOException {
     if (isPermissionEnabled) {
       if (dir.isDir(src)) {
         checkPathAccess(src, FsAction.READ_EXECUTE);
@@ -2073,7 +2076,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
                     Server.getRemoteIp(),
                     "listStatus", src, null, null);
     }
-    return dir.getListing(src);
+    return dir.getListing(src, startAfter);
   }
 
   /////////////////////////////////////////////////////////

+ 17 - 0
src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java

@@ -238,6 +238,23 @@ class INodeDirectory extends INode {
     return node;
   }
 
+  /**
+   * Given a child's name, return the index of the next child
+   * 
+   * @param name a child's name
+   * @return the index of the next child
+   */
+  int nextChild(byte[] name) {
+    if (name.length == 0) { // empty name
+      return 0;
+    }
+    int nextPos = Collections.binarySearch(children, name) + 1;
+    if (nextPos >= 0) {
+      return nextPos;
+    }
+    return -nextPos;
+  }
+  
   /**
    * Equivalent to addNode(path, newNode, false).
    * @see #addNode(String, INode, boolean)

+ 25 - 17
src/hdfs/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java

@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HftpFileSystem;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.VersionInfo;
 
@@ -35,7 +36,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Stack;
 import java.util.regex.Pattern;
-import java.util.regex.PatternSyntaxException;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -159,24 +159,32 @@ public class ListPathsServlet extends DfsServlet {
       while (!pathstack.empty()) {
         String p = pathstack.pop();
         try {
-          HdfsFileStatus[] listing = nnproxy.getListing(p);
-          if (listing == null) {
-            LOG.warn("ListPathsServlet - Path " + p + " does not exist");
-            continue;
-          }
-          for (HdfsFileStatus i : listing) {
-            String localName = i.getLocalName();
-            if (exclude.matcher(localName).matches()
-                || !filter.matcher(localName).matches()) {
-              continue;
+          byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;         
+          DirectoryListing thisListing;
+          do {
+            assert lastReturnedName != null;
+            thisListing = nnproxy.getListing(p, lastReturnedName);
+            if (thisListing == null) {
+              if (lastReturnedName.length == 0) {
+                LOG.warn("ListPathsServlet - Path " + p + " does not exist");
+              }
+              break;
             }
-            if (recur && i.isDir()) {
-              pathstack.push(new Path(p, localName).toUri().getPath());
+            HdfsFileStatus[] listing = thisListing.getPartialListing();
+            for (HdfsFileStatus i : listing) {
+              String localName = i.getLocalName();
+              if (exclude.matcher(localName).matches()
+                  || !filter.matcher(localName).matches()) {
+                continue;
+              }
+              if (recur && i.isDir()) {
+                pathstack.push(new Path(p, localName).toUri().getPath());
+              }
+              writeInfo(p, i, doc);
             }
-            writeInfo(p, i, doc);
-          }
-        }
-        catch(RemoteException re) {re.writeXml(p, doc);}
+            lastReturnedName = thisListing.getLastName();
+          } while (thisListing.hasMore());
+        } catch(RemoteException re) {re.writeXml(p, doc);}
       }
       if (doc != null) {
         doc.endDocument();

+ 7 - 11
src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -37,22 +37,20 @@ import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.*;
+import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
-import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.security.token.Token;
@@ -583,16 +581,14 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
     namesystem.renewLease(clientName);        
   }
 
-  /**
-   */
-  public HdfsFileStatus[] getListing(String src) throws IOException {
-    HdfsFileStatus[] files = namesystem.getListing(src);
-    if (files != null) {
-      myMetrics.numGetListingOps.inc();
-    }
+  @Override
+  public DirectoryListing getListing(String src, byte[] startAfter)
+  throws IOException {
+    DirectoryListing files = namesystem.getListing(src, startAfter);
+    myMetrics.numGetListingOps.inc();
     return files;
   }
-
+  
   /**
    * Get the file info for a specific file.
    * @param src The string representation of the path to the file

+ 18 - 12
src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
@@ -139,12 +140,10 @@ public class NamenodeFsck {
   public void fsck() {
     try {
       Result res = new Result(conf);
+      final HdfsFileStatus file = namenode.getFileInfo(path);
+      if (file != null) {
+        check(path, file, res);
 
-      final HdfsFileStatus[] files = namenode.getListing(path);
-      if (files != null) {
-        for (int i = 0; i < files.length; i++) {
-          check(path, files[i], res);
-        }
         out.println(res);
         out.println(" Number of data-nodes:\t\t" + totalDatanodes);
         out.println(" Number of racks:\t\t" + networktopology.getNumOfRacks());
@@ -175,17 +174,24 @@ public class NamenodeFsck {
     boolean isOpen = false;
 
     if (file.isDir()) {
-      final HdfsFileStatus[] files = namenode.getListing(path);
-      if (files == null) {
-        return;
-      }
+      byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;
+      DirectoryListing thisListing;
       if (showFiles) {
         out.println(path + " <dir>");
       }
       res.totalDirs++;
-      for (int i = 0; i < files.length; i++) {
-        check(path, files[i], res);
-      }
+      do {
+        assert lastReturnedName != null;
+        thisListing = namenode.getListing(path, lastReturnedName);
+        if (thisListing == null) {
+          return;
+        }
+        HdfsFileStatus[] files = thisListing.getPartialListing();
+        for (int i = 0; i < files.length; i++) {
+          check(path, files[i], res);
+        }
+        lastReturnedName = thisListing.getLastName();
+      } while (thisListing.hasMore());
       return;
     }
     long fileLen = file.getLen();

+ 2 - 0
src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java

@@ -191,6 +191,8 @@ public class TestDFSClientRetries extends TestCase {
 
     public HdfsFileStatus[] getListing(String src) throws IOException { return null; }
 
+    public DirectoryListing getListing(String src, byte[] startName) throws IOException { return null; }
+
     public void renewLease(String clientName) throws IOException {}
 
     public long[] getStats() throws IOException { return null; }

+ 19 - 19
src/test/org/apache/hadoop/hdfs/TestDFSUpgradeFromImage.java

@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs;
 
 import junit.framework.TestCase;
 import java.io.*;
-import java.net.InetSocketAddress;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.TreeMap;
@@ -28,9 +27,10 @@ import java.util.zip.CRC32;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -127,43 +127,43 @@ public class TestDFSUpgradeFromImage extends TestCase {
   
   CRC32 overallChecksum = new CRC32();
   
-  private void verifyDir(DFSClient client, String dir) 
+  private void verifyDir(DistributedFileSystem dfs, Path dir) 
                                            throws IOException {
     
-    HdfsFileStatus[] fileArr = client.listPaths(dir);
-    TreeMap<String, Boolean> fileMap = new TreeMap<String, Boolean>();
+    FileStatus[] fileArr = dfs.listStatus(dir);
+    TreeMap<Path, Boolean> fileMap = new TreeMap<Path, Boolean>();
     
-    for(HdfsFileStatus file : fileArr) {
-      String path = file.getFullName(dir);
-      fileMap.put(path, Boolean.valueOf(file.isDir()));
+    for(FileStatus file : fileArr) {
+      fileMap.put(file.getPath(), Boolean.valueOf(file.isDir()));
     }
     
-    for(Iterator<String> it = fileMap.keySet().iterator(); it.hasNext();) {
-      String path = it.next();
+    for(Iterator<Path> it = fileMap.keySet().iterator(); it.hasNext();) {
+      Path path = it.next();
       boolean isDir = fileMap.get(path);
       
-      overallChecksum.update(path.getBytes());
+      String pathName = path.toUri().getPath();
+      overallChecksum.update(pathName.getBytes());
       
       if ( isDir ) {
-        verifyDir(client, path);
+        verifyDir(dfs, path);
       } else {
         // this is not a directory. Checksum the file data.
         CRC32 fileCRC = new CRC32();
-        FSInputStream in = client.open(path);
+        FSInputStream in = dfs.dfs.open(pathName);
         byte[] buf = new byte[4096];
         int nRead = 0;
         while ( (nRead = in.read(buf, 0, buf.length)) > 0 ) {
           fileCRC.update(buf, 0, nRead);
         }
         
-        verifyChecksum(path, fileCRC.getValue());
+        verifyChecksum(pathName, fileCRC.getValue());
       }
     }
   }
   
-  private void verifyFileSystem(DFSClient client) throws IOException {
+  private void verifyFileSystem(DistributedFileSystem dfs) throws IOException {
   
-    verifyDir(client, "/");
+    verifyDir(dfs, new Path("/"));
     
     verifyChecksum("overallCRC", overallChecksum.getValue());
     
@@ -185,8 +185,8 @@ public class TestDFSUpgradeFromImage extends TestCase {
       cluster = new MiniDFSCluster(0, conf, numDataNodes, false, true,
                                    StartupOption.UPGRADE, null);
       cluster.waitActive();
-      DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
-                                           cluster.getNameNodePort()), conf);
+      DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
+      DFSClient dfsClient = dfs.dfs;
       //Safemode will be off only after upgrade is complete. Wait for it.
       while ( dfsClient.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_GET) ) {
         LOG.info("Waiting for SafeMode to be OFF.");
@@ -195,7 +195,7 @@ public class TestDFSUpgradeFromImage extends TestCase {
         } catch (InterruptedException ignored) {}
       }
 
-      verifyFileSystem(dfsClient);
+      verifyFileSystem(dfs);
     } finally {
       if (cluster != null) { cluster.shutdown(); }
     }

+ 43 - 19
src/test/org/apache/hadoop/hdfs/TestFileStatus.java

@@ -45,10 +45,6 @@ public class TestFileStatus extends TestCase {
   static final int blockSize = 8192;
   static final int fileSize = 16384;
 
-  private static String TEST_ROOT_DIR =
-    new Path(System.getProperty("test.build.data","/tmp"))
-    .toString().replace(' ', '+');
-  
   private void writeFile(FileSystem fileSys, Path name, int repl,
                          int fileSize, int blockSize)
     throws IOException {
@@ -74,6 +70,7 @@ public class TestFileStatus extends TestCase {
    */
   public void testFileStatus() throws IOException {
     Configuration conf = new Configuration();
+    conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 2);
     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
     FileSystem fs = cluster.getFileSystem();
     final HftpFileSystem hftpfs = cluster.getHftpFileSystem();
@@ -85,8 +82,6 @@ public class TestFileStatus extends TestCase {
       //
       Path path = new Path("/");
       System.out.println("Path : \"" + path.toString() + "\"");
-      System.out.println(fs.isDirectory(path));
-      System.out.println(fs.getFileStatus(path).isDir()); 
       assertTrue("/ should be a directory", 
                  fs.getFileStatus(path).isDir() == true);
       
@@ -125,10 +120,16 @@ public class TestFileStatus extends TestCase {
       assertEquals(fs.makeQualified(file1).toString(), 
           status.getPath().toString());
 
-      // create an empty directory
-      //
-      Path parentDir = new Path("/test");
+      // test file status on a directory
       Path dir = new Path("/test/mkdirs");
+
+      // test listStatus on a non-existent file/directory
+      stats = fs.listStatus(dir);
+      assertEquals(null, stats);
+      status = fs.getFileStatus(dir);
+      assertEquals(null, status);
+      
+      // create the directory
       assertTrue(fs.mkdirs(dir));
       assertTrue(fs.exists(dir));
       System.out.println("Dir : \"" + dir + "\"");
@@ -164,8 +165,8 @@ public class TestFileStatus extends TestCase {
       status = fs.getFileStatus(file2);
       assertTrue(status.getBlockSize() == blockSize);
       assertTrue(status.getReplication() == 1);
-      assertEquals(fs.makeQualified(file2).toString(), 
-          status.getPath().toString());
+      file2 = fs.makeQualified(file2);
+      assertEquals(file2.toString(), status.getPath().toString());
 
       // create another file in the same directory
       Path file3 = new Path(dir, "filestatus3.dat");
@@ -173,7 +174,8 @@ public class TestFileStatus extends TestCase {
       System.out.println("Created file filestatus3.dat with one "
                          + " replicas.");
       checkFile(fs, file3, 1);
-
+      file3 = fs.makeQualified(file3);
+      
       // verify that the size of the directory increased by the size 
       // of the two files
       final int expected = blockSize/2;  
@@ -185,13 +187,35 @@ public class TestFileStatus extends TestCase {
        // test listStatus on a non-empty directory
        stats = fs.listStatus(dir);
        assertEquals(dir + " should have two entries", 2, stats.length);
-       String qualifiedFile2 = fs.makeQualified(file2).toString();
-       String qualifiedFile3 = fs.makeQualified(file3).toString();
-       for(FileStatus stat:stats) {
-         String statusFullName = stat.getPath().toString();
-         assertTrue(qualifiedFile2.equals(statusFullName)
-           || qualifiedFile3.toString().equals(statusFullName));
-       }    } finally {
+       assertEquals(file2.toString(), stats[0].getPath().toString());
+       assertEquals(file3.toString(), stats[1].getPath().toString());
+
+      // test iterative listing
+      // now dir has 2 entries, create one more
+      Path dir3 = fs.makeQualified(new Path(dir, "dir3"));
+      fs.mkdirs(dir3);
+      dir3 = fs.makeQualified(dir3);
+      stats = fs.listStatus(dir);
+      assertEquals(dir + " should have three entries", 3, stats.length);
+      assertEquals(dir3.toString(), stats[0].getPath().toString());
+      assertEquals(file2.toString(), stats[1].getPath().toString());
+      assertEquals(file3.toString(), stats[2].getPath().toString());
+
+      // now dir has 3 entries, create two more
+      Path dir4 = fs.makeQualified(new Path(dir, "dir4"));
+      fs.mkdirs(dir4);
+      dir4 = fs.makeQualified(dir4);
+      Path dir5 = fs.makeQualified(new Path(dir, "dir5"));
+      fs.mkdirs(dir5);
+      dir5 = fs.makeQualified(dir5);
+      stats = fs.listStatus(dir);
+      assertEquals(dir + " should have five entries", 5, stats.length);
+      assertEquals(dir3.toString(), stats[0].getPath().toString());
+      assertEquals(dir4.toString(), stats[1].getPath().toString());
+      assertEquals(dir5.toString(), stats[2].getPath().toString());
+      assertEquals(file2.toString(), stats[3].getPath().toString());
+      assertEquals(file3.toString(), stats[4].getPath().toString());
+    } finally {
       fs.close();
       cluster.shutdown();
     }

+ 3 - 2
src/test/org/apache/hadoop/mapred/TestSubmitJob.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -239,7 +240,7 @@ public class TestSubmitJob extends TestCase {
         Path path = new Path(new URI(jt.getSystemDir()).getPath());
         LOG.info("Try listing the mapred-system-dir as the user ("
             + user2.getUserName() + ")");
-        client.getListing(path.toString());
+        client.getListing(path.toString(), HdfsFileStatus.EMPTY_NAME);
       } catch (IOException ioe) {
         failed = true;
       }
@@ -252,7 +253,7 @@ public class TestSubmitJob extends TestCase {
       try {
         LOG.info("Try accessing the job folder for job " + id + " as the user ("
             + user2.getUserName() + ")");
-        client.getListing(jobSubmitDirpath.toString());
+        client.getListing(jobSubmitDirpath.toString(), HdfsFileStatus.EMPTY_NAME);
       } catch (IOException ioe) {
         failed = true;
       }

+ 13 - 3
src/webapps/datanode/browseDirectory.jsp

@@ -76,7 +76,6 @@
         return;
       }
       // directory
-      HdfsFileStatus[] files = dfs.listPaths(target);
       //generate a table and dump the info
       String [] headings = { "Name", "Type", "Size", "Replication", 
                               "Block Size", "Modification Time",
@@ -93,8 +92,17 @@
         out.print("<a href=\"" + req.getRequestURL() + "?dir=" + parent +
                   "&namenodeInfoPort=" + namenodeInfoPort +
                   "\">Go to parent directory</a><br>");
-	
-      if (files == null || files.length == 0) {
+
+      DirectoryListing thisListing;
+      byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;
+      do {	
+	  thisListing = dfs.listPaths(target, lastReturnedName);
+	  if (thisListing == null && lastReturnedName.length == 0) {
+		out.print("Empty directory");
+		break;
+	  }
+      HdfsFileStatus[] files = thisListing.getPartialListing();
+      if (files.length == 0 && lastReturnedName.length == 0) {
         out.print("Empty directory");
       }
       else {
@@ -130,6 +138,8 @@
         }
         jspHelper.addTableFooter(out);
       }
+      lastReturnedName = thisListing.getLastName();
+      } while (thisListing.hasMore());
     } 
     String namenodeHost = jspHelper.nameNodeAddr.getHostName();
     out.print("<br><a href=\"http://" +